Skip to content

Commit

Permalink
Cancel outgoing inflight packages when reconnecting \w clean session
Browse files Browse the repository at this point in the history
  • Loading branch information
gausby committed Aug 19, 2018
1 parent 3688c57 commit 68e3445
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 6 deletions.
10 changes: 4 additions & 6 deletions lib/tortoise/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,8 @@ defmodule Tortoise.Connection do

@impl true
def handle_info(:connect, state) do
state =
state
# make sure we will not fall for a keep alive timeout while we reconnect
|> cancel_keep_alive()
# make sure we will not fall for a keep alive timeout while we reconnect
state = cancel_keep_alive(state)

with {%Connack{status: :accepted} = connack, socket} <-
do_connect(state.server, state.connect),
Expand All @@ -261,8 +259,8 @@ defmodule Tortoise.Connection do
{:noreply, state}

%Connack{session_present: false} ->
# delete inflight state ?
if not Enum.empty?(state.subscriptions), do: send(self(), :subscribe)
:ok = Inflight.reset(state.client_id)
unless Enum.empty?(state.subscriptions), do: send(self(), :subscribe)
{:noreply, state}
end
else
Expand Down
17 changes: 17 additions & 0 deletions lib/tortoise/connection/inflight.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Tortoise.Connection.Inflight do
GenStateMachine.stop(via_name(client_id))
end

@doc false
def track(client_id, {:incoming, %Package.Publish{qos: qos} = publish})
when qos in 1..2 do
:ok = GenStateMachine.cast(via_name(client_id), {:incoming, publish})
Expand All @@ -49,6 +50,7 @@ defmodule Tortoise.Connection.Inflight do
end
end

@doc false
def track_sync(client_id, {:outgoing, _} = command, timeout \\ :infinity) do
{:ok, ref} = track(client_id, command)

Expand All @@ -60,10 +62,16 @@ defmodule Tortoise.Connection.Inflight do
end
end

@doc false
def update(client_id, {_, %{__struct__: _, identifier: _identifier}} = event) do
:ok = GenStateMachine.cast(via_name(client_id), {:update, event})
end

@doc false
def reset(client_id) do
:ok = GenStateMachine.cast(via_name(client_id), :reset)
end

# Server callbacks
@impl true
def init(opts) do
Expand Down Expand Up @@ -225,6 +233,15 @@ defmodule Tortoise.Connection.Inflight do
end
end

def handle_event(:cast, :reset, _, %State{pending: pending} = data) do
# cancel all currently outgoing messages
for {_, %Track{polarity: :negative, caller: {pid, ref}}} <- pending do
send(pid, {{Tortoise, data.client_id}, ref, {:error, :canceled}})
end

{:keep_state, %State{data | pending: %{}, order: []}}
end

# We trap the incoming QoS 2 packages in the inflight manager so we
# can make sure we will not onward them to the connection handler
# more than once.
Expand Down
6 changes: 6 additions & 0 deletions lib/tortoise/connection/inflight/track.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ defmodule Tortoise.Connection.Inflight.Track do
@opaque t :: %__MODULE__{
polarity: :positive | :negative,
type: package,
caller: {pid(), reference()} | nil,
identifier: package_identifier(),
status: [status_update()],
pending: [next_action()]
}
@enforce_keys [:type, :identifier, :polarity, :pending]
defstruct type: nil,
polarity: nil,
caller: nil,
identifier: nil,
status: [],
pending: []
Expand Down Expand Up @@ -108,6 +110,7 @@ defmodule Tortoise.Connection.Inflight.Track do
%State{
type: Package.Publish,
polarity: :negative,
caller: {pid, ref},
identifier: id,
pending: [
[
Expand Down Expand Up @@ -146,6 +149,7 @@ defmodule Tortoise.Connection.Inflight.Track do
%State{
type: Package.Publish,
polarity: :negative,
caller: {pid, ref},
identifier: id,
pending: [
[
Expand All @@ -170,6 +174,7 @@ defmodule Tortoise.Connection.Inflight.Track do
%State{
type: Package.Subscribe,
polarity: :negative,
caller: {pid, ref},
identifier: id,
pending: [
[
Expand All @@ -189,6 +194,7 @@ defmodule Tortoise.Connection.Inflight.Track do
%State{
type: Package.Unsubscribe,
polarity: :negative,
caller: {pid, ref},
identifier: id,
pending: [
[
Expand Down
12 changes: 12 additions & 0 deletions test/tortoise/connection/inflight_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,16 @@ defmodule Tortoise.Connection.InflightTest do
assert {:ok, ^expected} = :gen_tcp.recv(context.server, byte_size(expected), 500)
end
end

describe "resetting" do
setup [:setup_connection, :setup_inflight]

test "cancel outgoing inflight packages", %{client_id: client_id} do
publish = %Package.Publish{identifier: 1, topic: "foo", qos: 1}
{:ok, ref} = Inflight.track(client_id, {:outgoing, publish})
:ok = Inflight.reset(client_id)
# the calling process should get a result response
assert_receive {{Tortoise, ^client_id}, ^ref, {:error, :canceled}}
end
end
end

0 comments on commit 68e3445

Please sign in to comment.