From f2be1edd56c7117352e578d41d275184c48d991a Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Tue, 25 Mar 2014 13:00:58 +0200 Subject: [PATCH 1/2] APNS resend queue --- src/apns_connection.erl | 9 ++- src/apns_queue.erl | 123 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 src/apns_queue.erl diff --git a/src/apns_connection.erl b/src/apns_connection.erl index cb3f770..8bbfb61 100644 --- a/src/apns_connection.erl +++ b/src/apns_connection.erl @@ -19,7 +19,8 @@ in_socket :: tuple(), connection :: apns:connection(), in_buffer = <<>> :: binary(), - out_buffer = <<>> :: binary()}). + out_buffer = <<>> :: binary(), + queue :: pid()}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -68,12 +69,14 @@ build_payload(Msg) -> -spec init(apns:connection()) -> {ok, state()} | {stop, term()}. init(Connection) -> try + {ok, QID} = apns_queue:start_link(), case open_out(Connection) of {ok, OutSocket} -> case open_feedback(Connection) of {ok, InSocket} -> {ok, #state{ out_socket = OutSocket , in_socket = InSocket , connection = Connection + , queue = QID }}; {error, Reason} -> {stop, Reason} end; @@ -156,11 +159,13 @@ handle_cast(Msg, State) when is_record(Msg, apns_msg) -> Socket = State#state.out_socket, Payload = build_payload(Msg), BinToken = hexstr_to_bin(Msg#apns_msg.device_token), + apns_queue:in(State#state.queue, Msg), case send_payload( Socket, Msg#apns_msg.id, Msg#apns_msg.expiry, BinToken, Payload, Msg#apns_msg.priority) of ok -> {noreply, State}; {error, Reason} -> + apns_queue:fail(State#state.queue, Msg#apns_msg.id), {stop, {error, Reason}, State} end; @@ -181,6 +186,8 @@ handle_info( {ssl, SslSocket, Data} case Command of 8 -> %% Error Status = parse_status(StatusCode), + {_MsgFailed, RestMsg} = apns_queue:fail(State#state.queue, MsgId), + [send_message(self(), M) || M <- RestMsg], try Error(MsgId, Status) of stop -> throw({stop, {msg_error, MsgId, Status}, State}); _ -> noop diff --git a/src/apns_queue.erl b/src/apns_queue.erl new file mode 100644 index 0000000..6c76c86 --- /dev/null +++ b/src/apns_queue.erl @@ -0,0 +1,123 @@ +%%%------------------------------------------------------------------- +%%% @author Manuel Rubio +%%% @copyright (C) 2014 Altenwald Solutions, S.L. +%%% @doc apns4erl queue for recover in failure +%%% @end +%%%------------------------------------------------------------------- +-module(apns_queue). +-author('Manuel Rubio '). + +-include("apns.hrl"). + +-behaviour(gen_server). + +-define(QUEUE, queue). +-define(DEFAULT_MAX_ENTRIES, 1000). + +-export([ + start_link/0, + stop/1, + + in/2, + fail/2, + + % callbacks + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, { + queue = ?QUEUE:new() :: queue(), + max_entries = ?DEFAULT_MAX_ENTRIES :: pos_integer() +}). + +-type state() :: #state{}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Public API +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @doc Stops the connection +-spec stop(QID :: pid()) -> ok. +stop(QID) -> + gen_server:cast(QID, stop). + +-spec in(QID :: pid(), Msg :: apns:msg()) -> ok. +in(QID, Msg) -> + gen_server:cast(QID, {in, Msg}). + +-spec fail(QID :: pid(), ID :: binary()) -> [apns:msg()]. +fail(QID, ID) -> + gen_server:call(QID, {fail, ID}). + +%% @hidden +-spec start_link() -> {ok, pid()} | {error, {already_started, pid()}}. +start_link() -> + gen_server:start_link(?MODULE, [], []). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Server implementation, a.k.a.: callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @hidden +-spec init([]) -> {ok, state()}. +init([]) -> + {ok, #state{}}. + +%% @hidden +-spec handle_cast(stop | term(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}. +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast({in, Msg}, #state{max_entries=MaxEntries,queue=OldQueue}=State) -> + Queue = case MaxEntries =< ?QUEUE:len(OldQueue) of + true -> ?QUEUE:liat(OldQueue); + false -> OldQueue + end, + {noreply, State#state{queue = ?QUEUE:in(Msg, Queue)}}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +%% @hidden +-spec handle_call(X::term(), reference(), state()) -> {reply, {Failed::apns:msg(), RestToRetry::[apns:msg()]}, state()}. +handle_call({fail, ID}, _From, #state{queue=Queue}=State) -> + {reply, recover_fail(ID, Queue), State#state{queue=?QUEUE:new()}}; + +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + +%% @hidden +-spec handle_info(term(), state()) -> {noreply, state()}. +handle_info(_Info, State) -> + {noreply, State}. + +%% @hidden +-spec terminate(term(), state()) -> ok. +terminate(_Reason, _State) -> + ok. + +%% @hidden +-spec code_change(term(), state(), term()) -> {ok, state()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Server implementation, a.k.a.: callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec recover_fail(ID::binary(), Queue::queue()) -> {apns:msg(), [apns:msg()]}. +%@hidden +recover_fail(ID, Queue) -> + Now = apns:expiry(0), + List = ?QUEUE:to_list(?QUEUE:filter(fun + (#apns_msg{expiry=Expiry}) -> Expiry > Now + end, Queue)), + DropWhile = fun(#apns_msg{id=I}) -> I =/= ID end, + case lists:dropwhile(DropWhile, List) of + [Failed|RestToRetry] -> {Failed, RestToRetry}; + [] -> {undefined, []} + end. From f8072bcf726731c98c3122f6ea7bae1aea5096a3 Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Thu, 1 Jan 2015 01:59:29 +0100 Subject: [PATCH 2/2] remove QUEUE macro --- src/apns_queue.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/apns_queue.erl b/src/apns_queue.erl index 6c76c86..52ea428 100644 --- a/src/apns_queue.erl +++ b/src/apns_queue.erl @@ -11,7 +11,6 @@ -behaviour(gen_server). --define(QUEUE, queue). -define(DEFAULT_MAX_ENTRIES, 1000). -export([ @@ -30,7 +29,7 @@ code_change/3]). -record(state, { - queue = ?QUEUE:new() :: queue(), + queue = queue:new() :: queue(), max_entries = ?DEFAULT_MAX_ENTRIES :: pos_integer() }). @@ -73,11 +72,11 @@ handle_cast(stop, State) -> {stop, normal, State}; handle_cast({in, Msg}, #state{max_entries=MaxEntries,queue=OldQueue}=State) -> - Queue = case MaxEntries =< ?QUEUE:len(OldQueue) of - true -> ?QUEUE:liat(OldQueue); + Queue = case MaxEntries =< queue:len(OldQueue) of + true -> queue:liat(OldQueue); false -> OldQueue end, - {noreply, State#state{queue = ?QUEUE:in(Msg, Queue)}}; + {noreply, State#state{queue = queue:in(Msg, Queue)}}; handle_cast(_Msg, State) -> {noreply, State}. @@ -85,7 +84,7 @@ handle_cast(_Msg, State) -> %% @hidden -spec handle_call(X::term(), reference(), state()) -> {reply, {Failed::apns:msg(), RestToRetry::[apns:msg()]}, state()}. handle_call({fail, ID}, _From, #state{queue=Queue}=State) -> - {reply, recover_fail(ID, Queue), State#state{queue=?QUEUE:new()}}; + {reply, recover_fail(ID, Queue), State#state{queue=queue:new()}}; handle_call(_Msg, _From, State) -> {reply, ok, State}. @@ -113,7 +112,7 @@ code_change(_OldVsn, State, _Extra) -> %@hidden recover_fail(ID, Queue) -> Now = apns:expiry(0), - List = ?QUEUE:to_list(?QUEUE:filter(fun + List = queue:to_list(queue:filter(fun (#apns_msg{expiry=Expiry}) -> Expiry > Now end, Queue)), DropWhile = fun(#apns_msg{id=I}) -> I =/= ID end,