Skip to content

Commit

Permalink
Merge pull request #39 from unisontech/apn_queue
Browse files Browse the repository at this point in the history
APNS resend queue
  • Loading branch information
igaray committed Jan 5, 2015
2 parents 925dc79 + f8072bc commit 7b7153e
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 1 deletion.
9 changes: 8 additions & 1 deletion src/apns_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
in_socket :: tuple(),
connection :: apns:connection(),
in_buffer = <<>> :: binary(),
out_buffer = <<>> :: binary()}).
out_buffer = <<>> :: binary(),
queue :: pid()}).
-type state() :: #state{}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand Down Expand Up @@ -68,12 +69,14 @@ build_payload(Msg) ->
-spec init(apns:connection()) -> {ok, state()} | {stop, term()}.
init(Connection) ->
try
{ok, QID} = apns_queue:start_link(),
case open_out(Connection) of
{ok, OutSocket} -> case open_feedback(Connection) of
{ok, InSocket} ->
{ok, #state{ out_socket = OutSocket
, in_socket = InSocket
, connection = Connection
, queue = QID
}};
{error, Reason} -> {stop, Reason}
end;
Expand Down Expand Up @@ -156,11 +159,13 @@ handle_cast(Msg, State) when is_record(Msg, apns_msg) ->
Socket = State#state.out_socket,
Payload = build_payload(Msg),
BinToken = hexstr_to_bin(Msg#apns_msg.device_token),
apns_queue:in(State#state.queue, Msg),
case send_payload(
Socket, Msg#apns_msg.id, Msg#apns_msg.expiry, BinToken, Payload, Msg#apns_msg.priority) of
ok ->
{noreply, State};
{error, Reason} ->
apns_queue:fail(State#state.queue, Msg#apns_msg.id),
{stop, {error, Reason}, State}
end;

Expand All @@ -181,6 +186,8 @@ handle_info( {ssl, SslSocket, Data}
case Command of
8 -> %% Error
Status = parse_status(StatusCode),
{_MsgFailed, RestMsg} = apns_queue:fail(State#state.queue, MsgId),
[send_message(self(), M) || M <- RestMsg],
try Error(MsgId, Status) of
stop -> throw({stop, {msg_error, MsgId, Status}, State});
_ -> noop
Expand Down
122 changes: 122 additions & 0 deletions src/apns_queue.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
%%%-------------------------------------------------------------------
%%% @author Manuel Rubio <manuel@altenwald.com>
%%% @copyright (C) 2014 Altenwald Solutions, S.L.
%%% @doc apns4erl queue for recover in failure
%%% @end
%%%-------------------------------------------------------------------
-module(apns_queue).
-author('Manuel Rubio <manuel@altenwald.com>').

-include("apns.hrl").

-behaviour(gen_server).

-define(DEFAULT_MAX_ENTRIES, 1000).

-export([
start_link/0,
stop/1,

in/2,
fail/2,

% callbacks
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).

-record(state, {
queue = queue:new() :: queue(),
max_entries = ?DEFAULT_MAX_ENTRIES :: pos_integer()
}).

-type state() :: #state{}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Public API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @doc Stops the connection
-spec stop(QID :: pid()) -> ok.
stop(QID) ->
gen_server:cast(QID, stop).

-spec in(QID :: pid(), Msg :: apns:msg()) -> ok.
in(QID, Msg) ->
gen_server:cast(QID, {in, Msg}).

-spec fail(QID :: pid(), ID :: binary()) -> [apns:msg()].
fail(QID, ID) ->
gen_server:call(QID, {fail, ID}).

%% @hidden
-spec start_link() -> {ok, pid()} | {error, {already_started, pid()}}.
start_link() ->
gen_server:start_link(?MODULE, [], []).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Server implementation, a.k.a.: callbacks
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @hidden
-spec init([]) -> {ok, state()}.
init([]) ->
{ok, #state{}}.

%% @hidden
-spec handle_cast(stop | term(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
handle_cast(stop, State) ->
{stop, normal, State};

handle_cast({in, Msg}, #state{max_entries=MaxEntries,queue=OldQueue}=State) ->
Queue = case MaxEntries =< queue:len(OldQueue) of
true -> queue:liat(OldQueue);
false -> OldQueue
end,
{noreply, State#state{queue = queue:in(Msg, Queue)}};

handle_cast(_Msg, State) ->
{noreply, State}.

%% @hidden
-spec handle_call(X::term(), reference(), state()) -> {reply, {Failed::apns:msg(), RestToRetry::[apns:msg()]}, state()}.
handle_call({fail, ID}, _From, #state{queue=Queue}=State) ->
{reply, recover_fail(ID, Queue), State#state{queue=queue:new()}};

handle_call(_Msg, _From, State) ->
{reply, ok, State}.

%% @hidden
-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(_Info, State) ->
{noreply, State}.

%% @hidden
-spec terminate(term(), state()) -> ok.
terminate(_Reason, _State) ->
ok.

%% @hidden
-spec code_change(term(), state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Server implementation, a.k.a.: callbacks
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-spec recover_fail(ID::binary(), Queue::queue()) -> {apns:msg(), [apns:msg()]}.
%@hidden
recover_fail(ID, Queue) ->
Now = apns:expiry(0),
List = queue:to_list(queue:filter(fun
(#apns_msg{expiry=Expiry}) -> Expiry > Now
end, Queue)),
DropWhile = fun(#apns_msg{id=I}) -> I =/= ID end,
case lists:dropwhile(DropWhile, List) of
[Failed|RestToRetry] -> {Failed, RestToRetry};
[] -> {undefined, []}
end.

0 comments on commit 7b7153e

Please sign in to comment.