Skip to content

Commit

Permalink
add confirm handler registration
Browse files Browse the repository at this point in the history
  • Loading branch information
Electron-libre committed Feb 17, 2019
1 parent 10c50e9 commit 97f7e3f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
52 changes: 52 additions & 0 deletions lib/amqp/confirm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,56 @@ defmodule AMQP.Confirm do
def wait_for_confirms_or_die(%Channel{pid: pid}, timeout) do
:amqp_channel.wait_for_confirms_or_die(pid, timeout)
end

@doc """
On channel with confirm activated, return the next message sequence number.
To use in combination with `register_handler/2`
"""
@spec next_publish_seqno(Channel.t) :: non_neg_integer
def next_publish_seqno(%Channel{pid: pid}) do
:amqp_channel.next_publish_seqno(pid)
end

@doc """
Register a handler for confirms on channel.
The handler will receive either:
* `{:basic_ack, seqno, multiple}`
* `{:basic_nack, seqno, multiple}`
The `seqno` (delivery_tag) is an integer, the sequence number of the message.
`multiple` is a boolean, when `true` means multiple messages confirm, upto `seqno`.
see https://www.rabbitmq.com/confirms.html
"""
@spec register_handler(Channel.t, pid) :: :ok
def register_handler(%Channel{pid: pid}, handler_pid) do
adapter_pid = spawn fn ->
Process.flag(:trap_exit, true)
Process.monitor(handler_pid)
Process.monitor(pid)
handle_confirm(handler_pid)
end
:amqp_channel.register_confirm_handler(pid, adapter_pid)
end

@spec unregister_handler(Channel.t) :: :ok
def unregister_handler(%Channel{pid: pid}) do
:amqp_channel.unregister_confirm_handler(pid)
end

defp handle_confirm(handler_pid) do
receive do
basic_ack(delivery_tag: delivery_tag, multiple: multiple) ->
send(handler_pid, {:basic_ack, delivery_tag, multiple})
handle_confirm(handler_pid)

basic_nack(delivery_tag: delivery_tag, multiple: multiple) ->
send(handler_pid, {:basic_nack, delivery_tag, multiple})
handle_confirm(handler_pid)

{:DOWN, _ref, :process, _pid, reason} ->
exit(reason)
end
end
end

46 changes: 46 additions & 0 deletions test/confirm_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule ConfirmTest do
use ExUnit.Case

alias AMQP.Connection
alias AMQP.Channel
alias AMQP.Confirm

setup do
{:ok, conn} = Connection.open
{:ok, chan} = Channel.open(conn)
:ok = Confirm.select(chan)
on_exit fn ->
:ok = Connection.close(conn)
end
{:ok, chan: chan}
end

describe "next_publish_seqno" do
test "returns 1 whe no messages where sent", ctx do
assert 1 == Confirm.next_publish_seqno(ctx[:chan])
end
end

describe "register_handler" do
test "handler receive confirm with message seqno", ctx do
seq_no = Confirm.next_publish_seqno(ctx[:chan])
:ok = AMQP.Basic.publish(ctx[:chan], "", "", "foo")
:ok = Confirm.register_handler(ctx[:chan], self())
assert_receive {:basic_ack, seq_no, false}
:ok = Confirm.unregister_handler(ctx[:chan])
end
end

describe "unregister_handler" do
setup ctx do
:ok = Confirm.register_handler(ctx[:chan], self())
{:ok, ctx}
end

test "handler no more receive confirm", ctx do
:ok = Confirm.unregister_handler(ctx[:chan])
:ok = AMQP.Basic.publish(ctx[:chan], "", "", "foo")
refute_receive {:basic_ack, 1, false}
end
end
end

0 comments on commit 97f7e3f

Please sign in to comment.