diff --git a/rebar.config b/rebar.config index 58a2d43..a18ee12 100644 --- a/rebar.config +++ b/rebar.config @@ -23,8 +23,8 @@ %% == Dependencies == {deps, [ - {gun, "1.3.3"}, - {jsx, "3.0.0"}, + {gun, "2.1.0"}, + {jsx, "3.1.0"}, {base64url, "1.0.1"} ]}. diff --git a/rebar.lock b/rebar.lock index be37c6e..f3e6eb0 100644 --- a/rebar.lock +++ b/rebar.lock @@ -2,16 +2,16 @@ [{<<"base64url">>,{pkg,<<"base64url">>,<<"1.0.1">>},0}, {<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.7.3">>},1}, {<<"gun">>,{pkg,<<"gun">>,<<"1.3.3">>},0}, - {<<"jsx">>,{pkg,<<"jsx">>,<<"3.0.0">>},0}]}. + {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},0}]}. [ {pkg_hash,[ {<<"base64url">>, <<"F8C7F2DA04CA9A5D0F5F50258F055E1D699F0E8BF4CFDB30B750865368403CF6">>}, {<<"cowlib">>, <<"A7FFCD0917E6D50B4D5FB28E9E2085A0CEB3C97DEA310505F7460FF5ED764CE9">>}, {<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}, - {<<"jsx">>, <<"20A170ABD4335FC6DB24D5FAD1E5D677C55DADF83D1B20A8A33B5FE159892A39">>}]}, + {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}]}, {pkg_hash_ext,[ {<<"base64url">>, <<"F9B3ADD4731A02A9B0410398B475B33E7566A695365237A6BDEE1BB447719F5C">>}, {<<"cowlib">>, <<"1E1A3D176D52DAEBBECBBCDFD27C27726076567905C2A9D7398C54DA9D225761">>}, {<<"gun">>, <<"3106CE167F9C9723F849E4FB54EA4A4D814E3996AE243A1C828B256E749041E0">>}, - {<<"jsx">>, <<"37BECA0435F5CA8A2F45F76A46211E76418FBEF80C36F0361C249FC75059DC6D">>}]} + {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}]} ]. diff --git a/src/apns_connection.erl b/src/apns_connection.erl index 75fea77..4f77925 100644 --- a/src/apns_connection.erl +++ b/src/apns_connection.erl @@ -54,6 +54,9 @@ , code_change/4 ]). +%% for spawn/3 +-export([ reply_errors_and_cancel_timers/2 ]). + -export_type([ name/0 , host/0 , port/0 @@ -89,8 +92,18 @@ , proxy_info => proxy_info() }. +-type stream_data() :: #{ from := {pid(), term()} + , stream := gun:stream_ref() + , timer := reference() + , status := non_neg_integer() + , headers := gun:headers() + , body := binary() + }. + -type state() :: #{ connection := connection() , gun_pid => pid() + , gun_streams => #{gun:stream_ref() => stream_data()} + , max_gun_streams := non_neg_integer() , gun_monitor => reference() , gun_connect_ref => reference() , client := pid() @@ -207,6 +220,8 @@ callback_mode() -> state_functions. init({Connection, Client}) -> StateData = #{ connection => Connection , client => Client + , gun_streams => #{} + , max_gun_streams => default_max_gun_streams(Connection) , backoff => 1 , backoff_ceiling => application:get_env(apns, backoff_ceiling, 10) }, @@ -226,12 +241,14 @@ open_connection(internal, _, #{connection := Connection} = StateData) -> open_origin(internal, _, #{connection := Connection} = StateData) -> Host = host(Connection), Port = port(Connection), - TransportOpts = transport_opts(Connection), + TlsOpts = tls_opts(Connection), + Http2Opts = http2_opts(), {next_state, open_common, StateData, {next_event, internal, { Host , Port , #{ protocols => [http2] - , transport_opts => TransportOpts + , http2_opts => Http2Opts + , tls_opts => TlsOpts , retry => 0 }}}}. @@ -274,12 +291,14 @@ proxy_connect_to_origin(internal, on_connect, StateData) -> #{connection := Connection, gun_pid := GunPid} = StateData, Host = host(Connection), Port = port(Connection), - TransportOpts = transport_opts(Connection), + TlsOpts = tls_opts(Connection), + Http2Opts = http2_opts(), Destination0 = #{ host => Host , port => Port , protocol => http2 + , http2_opts => Http2Opts , transport => tls - , tls_opts => TransportOpts + , tls_opts => TlsOpts }, Destination = case proxy(Connection) of #{ username := Username, password := Password } -> @@ -304,27 +323,153 @@ await_tunnel_up(EventType, EventContent, StateData) -> connected(internal, on_connect, #{client := Client}) -> Client ! {connection_up, self()}, keep_state_and_data; -connected( {call, {Client, _} = From} +connected( {call, From} , {push_notification, DeviceId, Notification, Headers} - , #{client := Client} = StateData) -> - #{connection := Connection, gun_pid := GunPid} = StateData, - #{timeout := Timeout} = Connection, - Response = push(GunPid, DeviceId, Headers, Notification, Timeout), - {keep_state_and_data, {reply, From, Response}}; -connected( {call, {Client, _} = From} + , StateData) -> + #{ connection := Connection + , gun_pid := GunPid + , gun_streams := Streams0 + , max_gun_streams := MaxStreams} = StateData, + StreamAllowed = stream_allowed(maps:size(Streams0), MaxStreams), + if + not StreamAllowed -> + {keep_state_and_data, {reply, From, {error, {overload, maps:size(Streams0), MaxStreams}}}}; + true -> + #{timeout := Timeout} = Connection, + StreamRef = send_push(GunPid, DeviceId, Headers, Notification), + Tmr = erlang:send_after(Timeout, self(), {timeout, GunPid, StreamRef}), + StreamData = #{ from => From + , stream => StreamRef + , timer => Tmr + , status => 200 %% b4 we know real status + , headers => [] + , body => <<>> }, + Streams1 = Streams0#{StreamRef => StreamData}, + {keep_state, StateData#{gun_streams => Streams1}} + end; +connected( {call, From} , {push_notification, Token, DeviceId, Notification, Headers0} - , #{client := Client} = StateData) -> - #{connection := Connection, gun_pid := GunConn} = StateData, - #{timeout := Timeout} = Connection, - Headers = add_authorization_header(Headers0, Token), - Response = push(GunConn, DeviceId, Headers, Notification, Timeout), - {keep_state_and_data, {reply, From, Response}}; -connected({call, From}, Event, _) when element(1, Event) =:= push_notification -> - {keep_state_and_data, {reply, From, {error, not_connection_owner}}}; + , StateData0) -> + #{ connection := Connection + , gun_pid := GunPid + , gun_streams := Streams0 + , max_gun_streams := MaxStreams} = StateData0, + StreamAllowed = stream_allowed(maps:size(Streams0), MaxStreams), + if + not StreamAllowed -> + {keep_state_and_data, {reply, From, {error, {overload, maps:size(Streams0), MaxStreams}}}}; + true -> + #{timeout := Timeout} = Connection, + Headers = add_authorization_header(Headers0, Token), + StreamRef = send_push(GunPid, DeviceId, Headers, Notification), + Tmr = erlang:send_after(Timeout, self(), {timeout, GunPid, StreamRef}), + StreamData = #{ from => From + , stream => StreamRef + , timer => Tmr + , status => 200 %% b4 we know real status + , headers => [] + , body => <<>> }, + Streams1 = Streams0#{StreamRef => StreamData}, + {keep_state, StateData0#{gun_streams => Streams1}} + end; connected({call, From}, wait_apns_connection_up, _) -> {keep_state_and_data, {reply, From, ok}}; connected({call, From}, Event, _) when Event =/= gun_pid -> {keep_state_and_data, {reply, From, {error, bad_call}}}; +connected( info + , {gun_response, GunPid, StreamRef, fin, Status, Headers} + , #{gun_pid := GunPid} = StateData0) -> + %% got response without body + #{gun_streams := Streams0} = StateData0, + #{StreamRef := StreamData} = Streams0, + #{from := From} = StreamData, + Streams1 = maps:remove(StreamRef, Streams0), + gun:cancel(GunPid, StreamRef), %% final response, closing stream + gen_statem:reply(From, {Status, Headers, no_body}), + {keep_state, StateData0#{gun_streams => Streams1}}; +connected( info + , {gun_response, GunPid, StreamRef, nofin, Status, Headers} + , #{gun_pid := GunPid} = StateData0) -> + %% update status & headers + #{gun_streams := Streams0} = StateData0, + #{StreamRef := StreamState0} = Streams0, + StreamState1 = StreamState0#{status => Status, headers => Headers}, + Streams1 = Streams0#{StreamRef => StreamState1}, + {keep_state, StateData0#{gun_streams => Streams1}}; +connected( info + , {gun_data, GunPid, StreamRef, fin, Data} + , #{gun_pid := GunPid} = StateData0) -> + %% got data, finally + #{gun_streams := Streams0} = StateData0, + #{StreamRef := StreamData} = Streams0, + #{from := From, status := Status, headers := H, body := B0} = StreamData, + Streams1 = maps:remove(StreamRef, Streams0), + gun:cancel(GunPid, StreamRef), %% final, closing stream + gen_statem:reply(From, {Status, H, <>}), + {keep_state, StateData0#{gun_streams => Streams1}}; +connected( info + , {gun_data, GunPid, StreamRef, nofin, Data} + , #{gun_pid := GunPid} = StateData0) -> + %% add data to buffer, still waiting + #{gun_streams := Streams0} = StateData0, + #{StreamRef := StreamState0} = Streams0, + #{body := B0} = StreamState0, + StreamState1 = StreamState0#{body => <>}, + Streams1 = Streams0#{StreamRef => StreamState1}, + {keep_state, StateData0#{gun_streams => Streams1}}; +connected( info + , {gun_error, GunPid, StreamRef, Reason} + , #{gun_pid := GunPid} = StateData0) -> + %% answering with error, remove entry + #{gun_streams := Streams0} = StateData0, + case maps:get(StreamRef, Streams0, null) of + null -> + %% nothing todo + {keep_state, StateData0}; + StreamData -> + #{from := From} = StreamData, + gen_statem:reply(From, {error, Reason}), + Streams1 = maps:remove(StreamRef, Streams0), + gun:cancel(GunPid, StreamRef), + {keep_state, StateData0#{gun_streams => Streams1}} + end; +connected( info + , {gun_error, GunPid, Reason} + , #{gun_pid := GunPid} = StateData0) -> + %% answer with error for all streams, remove all entries, going to reconnect + #{gun_streams := Streams} = StateData0, + spawn(apns_connection, reply_errors_and_cancel_timers, [Streams, Reason]), + {next_state, down, StateData0#{gun_streams => #{}}, + {next_event, internal, {down, ?FUNCTION_NAME, Reason}}}; +connected( info + , {timeout, GunPid, StreamRef} + , #{gun_pid := GunPid, gun_streams := Streams0} = StateData0) -> + %% gun pid matches, we have to answer {error, timeout} + case maps:find(StreamRef, Streams0) of + {ok, StreamData} -> + #{from := From} = StreamData, + gen_statem:reply(From, {error, timeout}), + Streams1 = maps:remove(StreamRef, Streams0), + gun:cancel(GunPid, StreamRef), + {keep_state, StateData0#{gun_streams => Streams1}}; + error -> + %% cant find stream data by stream ref? + %% may be just answered and removed, + %% ignoring + {keep_state, StateData0} + end; +connected(info, + {timeout, _GunPid, _StreamRef}, + StateData0) -> + %% timeout from different connection? + %% ignoring + {keep_state, StateData0}; +connected( info + , {gun_notify, GunPid, settings_changed, Settings} + , #{gun_pid := GunPid, max_gun_streams := MaxStreams0} = StateData0) -> + %% settings received, if contains max_concurrent_streams, update it + MaxStreams1 = maps:get(max_concurrent_streams, Settings, MaxStreams0), + {keep_state, StateData0#{max_gun_streams => MaxStreams1}}; connected(EventType, EventContent, StateData) -> handle_common(EventType, EventContent, ?FUNCTION_NAME, StateData, drop). @@ -356,9 +501,12 @@ handle_common(cast, stop, _, _, _) -> handle_common( info , {'DOWN', GunMon, process, GunPid, Reason} , StateName - , #{gun_pid := GunPid, gun_monitor := GunMon} = StateData + , #{gun_pid := GunPid, gun_monitor := GunMon} = StateData0 , _) -> - {next_state, down, StateData, + %% gun died, answering with errors, cleanup entries + #{gun_streams := Streams} = StateData0, + spawn(apns_connection, reply_errors_and_cancel_timers, [Streams, Reason]), + {next_state, down, StateData0#{gun_streams => #{}}, {next_event, internal, {down, StateName, Reason}}}; handle_common( state_timeout , EventContent @@ -423,24 +571,44 @@ proxy(#{proxy_info := Proxy}) -> proxy(_) -> undefined. -transport_opts(Connection) -> +-spec default_max_gun_streams(connection()) -> non_neg_integer() | infinity. +default_max_gun_streams(Setts) -> + case type(Setts) of + token -> 1; %% at start, for token we should set 1 + _ -> 100 + end. + +tls_opts(Connection) -> case type(Connection) of certdata -> Cert = certdata(Connection), Key = keydata(Connection), + %% proplist here, because it goes to ssl:connect/3 (by gun) [{cert, Cert}, {key, Key}]; cert -> Certfile = certfile(Connection), Keyfile = keyfile(Connection), [{certfile, Certfile}, {keyfile, Keyfile}]; token -> - [] + [] end. +http2_opts() -> + %% we need to know settings (from APN server), gun expects map + #{notify_settings_changed => true}. + %%%=================================================================== %%% Internal Functions %%%=================================================================== +-spec(stream_allowed(StreamsCount :: non_neg_integer(), + MaxStreams :: non_neg_integer() | infinity) -> + boolean()). +stream_allowed(_StreamsCount, infinity) -> true; +stream_allowed(StreamsCount, MaxStreams) -> + StreamsCount < MaxStreams. + + -spec get_headers(apns:headers()) -> list(). get_headers(Headers) -> List = [ {<<"apns-id">>, apns_id} @@ -467,21 +635,12 @@ get_device_path(DeviceId) -> add_authorization_header(Headers, Token) -> Headers#{apns_auth_token => <<"bearer ", Token/binary>>}. --spec push(pid(), apns:device_id(), apns:headers(), notification(), integer()) -> - apns:stream_id(). -push(GunConn, DeviceId, HeadersMap, Notification, Timeout) -> +-spec send_push(pid(), apns:device_id(), apns:headers(), notification()) -> + gun:stream_ref(). +send_push(GunPid, DeviceId, HeadersMap, Notification) -> Headers = get_headers(HeadersMap), Path = get_device_path(DeviceId), - StreamRef = gun:post(GunConn, Path, Headers, Notification), - case gun:await(GunConn, StreamRef, Timeout) of - {response, fin, Status, ResponseHeaders} -> - {Status, ResponseHeaders, no_body}; - {response, nofin, Status, ResponseHeaders} -> - {ok, Body} = gun:await_body(GunConn, StreamRef, Timeout), - DecodedBody = jsx:decode(Body, [{return_maps, false}]), - {Status, ResponseHeaders, DecodedBody}; - {error, timeout} -> timeout - end. + gun:post(GunPid, Path, Headers, Notification). -spec backoff(non_neg_integer(), non_neg_integer()) -> non_neg_integer(). backoff(N, Ceiling) -> @@ -492,3 +651,18 @@ backoff(N, Ceiling) -> NString = float_to_list(NextN, [{decimals, 0}]), list_to_integer(NString) end. + +%%%=================================================================== +%%% spawn/3 functions +%%%=================================================================== +-spec reply_errors_and_cancel_timers([stream_data()], term()) -> ok. +reply_errors_and_cancel_timers(Streams, Reason) -> + [reply_error_and_cancel_timer(From, Reason, Tmr) || + #{from := From, timer := Tmr} <- maps:values(Streams)], + ok. + +-spec reply_error_and_cancel_timer(From :: {pid(), term()}, Reason :: term(), + Tmr :: reference()) -> ok. +reply_error_and_cancel_timer(From, Reason, Tmr) -> + erlang:cancel_timer(Tmr), + gen_statem:reply(From, {error, Reason}).