Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

handle errors properly #9

Closed
celesteking opened this Issue Apr 23, 2011 · 6 comments

Comments

Projects
None yet
2 participants

It doesn't handle errors returned form broker. Thing finishes successfully while it should alert like hell. Also I wonder why broker send that error as all access params were configured correctly. tcpdump attached.

Here's the code:

client = OnStomp.connect('stomp://xxx:6163', opts)

scope = client.with_receipt do |r|
puts "Got my receipt!"
end

scope.send '/queue/testo', 'walks in to the room'

01:13:09.987833 IP 192.168.88.102.46657 > xxx.6163: S 945257220:945257220(0) win 5840 E..<..@.@.._..Xf.V.Q.A..8W{.................... ............ 01:13:10.133726 IP xxx.6163 > 192.168.88.102.46657: S 1950693974:1950693974(0) ack 945257221 win 5840 E..0..@.5....V.Q..Xf...AtE:V8W{.p....;.......... 01:13:10.133763 IP 192.168.88.102.46657 > xxx.6163: . ack 1 win 5840 E..(..@.@..r..Xf.V.Q.A..8W{.tE:WP....... 01:13:10.134141 IP 192.168.88.102.46657 > xxx.6163: P 1:100(99) ack 1 win 5840 E.....@.@.....Xf.V.Q.A..8W{.tE:WP....H..CONNECT login:tester heart-beat:0,0 passcode:sdfsdf host:broker.yyy accept-version:1.0,1.1

.
01:13:10.279916 IP xxx.6163 > 192.168.88.102.46657: . ack 100 win 5840
E..(v.@.5.@@.V.Q..Xf...AtE:W8W{hP............J
01:13:10.286294 IP xxx.6163 > 192.168.88.102.46657: P 1:67(66) ack 100 win 5840
E..jv.@.5.?..V.Q..Xf...AtE:W8W{hP...=...CONNECTED
session:ID:broker.yyy-49796-1303594934541-4:58

.

01:13:10.286309 IP 192.168.88.102.46657 > xxx.6163: . ack 67 win 5840
E..(..@.@..p..Xf.V.Q.A..8W{htE:.P.......
01:13:10.387027 IP 192.168.88.102.46657 > xxx.6163: P 100:180(80) ack 67 win 5840
E..x..@.@.....Xf.V.Q.A..8W{htE:.P....T..SEND
destination:/queue/testo
receipt:1
content-length:20

walks in to the room.
01:13:10.535728 IP xxx.6163 > 192.168.88.102.46657: . 67:1527(1460) ack 180 win 5840
E...v.@.5.:..V.Q..Xf...AtE:.8W{.P....3..ERROR
message:User tester is not authorized to write to: queue://testo
receipt-id:1

java.lang.SecurityException: User tester is not authorized to write to: queue://testo
at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:187)
at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:460)
at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:663)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:309)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:140)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:253)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:178)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:70)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTrans
01:13:10.535775 IP 192.168.88.102.46657 > xxx.6163: . ack 1527 win 8760
E..(..@.@..n..Xf.V.Q.A..8W{.tE@MP."8....
01:13:10.535784 IP xxx.6163 > 192.168.88.102.46657: P 1527:1651(124) ack 180 win 5840
E...v.@.5.?..V.Q..Xf...AtE@M8W{.P.......)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:201)
at java.lang.Thread.run(Thread.java:662)

Owner

iande commented Apr 24, 2011

To get feed back on ERROR frames sent from the broker, you'll need to set up a handler:

client.on_error do |error_frame|
  STDERR.puts "Error from broker: #{error_frame.body}"
end

Neither the STOMP 1.0 nor STOMP 1.1 protocols provide an automated means of relating an ERROR frame sent by the broker to a frame sent by the client, so any ERROR frame handling has to be done manually. The only real exception to this is in the CONNECT / CONNECTED frame exchange. If the broker responds to a CONNECT frame with anything other than CONNECTED, it's considered a protocol error, and that should raise an exception in the OnStomp::Client#connect method.

Try setting up an error handler and let me know if the error frame comes through.

Thanks for the feedback.

@ghost ghost assigned iande Apr 24, 2011

Thanks for explaining. Well, this sucks then. Uh, at least activemq sends receipt-id that can be used to correlate answer to the request. But I wonder why this gem doesn't provide synchronous communications - I tried to use receipts, but it didn't wait for them - it just went down the code, calling client.disconnect and terminating application.

client.connect

scope = client.with_receipt do |r|
  puts "Got my receipt!"
end
scope.send '/queue/testo', 'walks in to the room'

client.send '/queue/testo', 'Did you get this?' do |r|
  puts "Got my receipt: #{r['receipt-id']}"
end

client.on_error do |error_frame, client|
  STDERR.puts "Error from broker: #{error_frame.body}"
end

puts 'ready to finish'
client.disconnect

pp Thread.list
Thread.list.each {|th| th.join if th != Thread.current}

with output:

/usr/bin/ruby -e $stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift) /on/mq/first.rb
ready to finish
[#<Thread:0x7f7da09a4370 run>]

I thought scope.send would send the message synchronously - but it didn't wait for it. What's the right usage there?

Also, Thread.list shows only main thread, so I assume Client#disconnect kills all workers, even if there are "waiters" for receipt.

Owner

iande commented Apr 24, 2011

It is a little tricky to deal with as you are correct, all of the IO is done non-blocking and on a separate thread, so none of the methods will block, except Client#disconnect.

Client#disconnect does terminate the worker thread, but not by outright killing it. It tells the IO thread to write all of its contents, and then send a DISCONNECT frame.

A quick way to get your RECEIPT response would be to add a receipt header to your disconnect call:

client.disconnect :receipt => "rcpt-disconnect"

(You could also use a with_receipt block)

This will still flush the write buffer, but the connection will not be closed until a RECEIPT frame comes back with receipt-id: rcpt-disconnect.

I'm sorry for the confusion, I should probably make it more explicit in the docs that this library is almost entirely asynchronous, which requires a different approach to developing clients. However, for simple use cases where you have a producers and consumers, it's pretty straight forward. By the time client.disconnect is called, every frame you've written will be sent. On the consumer side, if your consumers subscribe and loop forever you don't have to do anything more, eventually the IO thread will read and process the data sent to it while the main thread loops.

For mixed operations and situations where you want to ensure your frames were all properly receipted, you'll need to approach things differently since everything is asynchronous. Another simple (and more correct) modification to your code would be:

client.connect

receipt_count = 0

scope = client.with_receipt do |r|
  puts "Got my receipt!"
  receipt_count += 1
end
scope.send '/queue/testo', 'walks in to the room'

client.send '/queue/testo', 'Did you get this?' do |r|
  puts "Got my receipt: #{r['receipt-id']}"
  receipt_count += 1
end

client.on_error do |error_frame, client|
  STDERR.puts "Error from broker: #{error_frame.body}"
end

Thread.pass while receipt_count < 2

puts 'ready to finish'
client.disconnect

It's a different sort of approach, and I admit it's not everyone's cup of tea. You might like the stomp gem's approach more. I'm not sure if you can make frames and their receipts synchronous, but with the last version I looked at, all of the client frame methods (#send, #subscribe, etc) were blocking at least.

I guess my docs need more work than I thought :)

One thing to note in the above code is that you don't have to worry about using mutex synchronization when updating receipt_count, as both of those events are dispatched from the same thread and will be dispatched consecutively, never concurrently. RIght now there are a few instances where you'd have to be more careful, calling client.send ... triggers the before_transmitting and before_send events within the same thread that client.send was called in, whereas after_transmitting and on_send are called within the IO processing thread. One of the changes I'm working on is putting all event dispatch into its own thread, so variables can be safely modified in any event callback without having to worry about crap like that.

Owner

iande commented Apr 24, 2011

If you have any other questions or comments, feel free to shoot me a message. And, of course, if you find any other bugs feel free to open another issue here.

Thanks again for your input. It's given me an idea for more specialized error handling.

@iande iande closed this Apr 24, 2011

Never seen such a helpful open-source developer! ;) Huge thanks for the help.

Owner

iande commented Jul 6, 2011

No problem, glad to help. If there's more interest in synchronous behavior in the future, I may implement that functionality. However, I'm not sure how much use this library is seeing at the moment, so it's difficult for me to commit to future features that don't just serve myself :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment