Skip to content

Commit

Permalink
Merge 8b4b148 into 88abd94
Browse files Browse the repository at this point in the history
  • Loading branch information
hpopp committed Jun 7, 2021
2 parents 88abd94 + 8b4b148 commit 6bf635d
Show file tree
Hide file tree
Showing 40 changed files with 569 additions and 507 deletions.
2 changes: 1 addition & 1 deletion .formatter.exs
@@ -1,4 +1,4 @@
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 90
line_length: 80
]
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -25,6 +25,10 @@ pigeon-*.tar
# Temporary files for e.g. tests.
/tmp

# Ignore dialyzer files
/priv/plts/*.plt
/priv/plts/*.plt.hash

# Misc.
cert.pem
key_unencrypted.pem
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,8 @@
([#183](https://github.com/codedge-llc/pigeon/pull/183))
- Kadabra bumped to v0.5.0, and now a required dependency.
([#184](https://github.com/codedge-llc/pigeon/pull/184))
- Sending a list of pushes synchronously now actually sends them one at a time. For production
workloads, using the async `:on_response` callback is strongly suggested.

**Fixed**

Expand All @@ -21,6 +23,7 @@
- `:certfile` and `:keyfile` are no longer valid options for APNS configurations.
Instead, read the file before loading (e.g. `cert: File.read!("cert.pem")`)
([#183](https://github.com/codedge-llc/pigeon/pull/183))
- `:debug_log` removed.

## v1.6.0

Expand Down
3 changes: 0 additions & 3 deletions config/test.exs
Expand Up @@ -8,9 +8,6 @@ config :pigeon, :test,
apns_key: File.read!("key_unencrypted.pem"),
apns_topic: System.get_env("APNS_TOPIC")

config :pigeon,
debug_log: true

config :pigeon, PigeonTest.ADM,
adapter: Pigeon.ADM,
client_id: System.get_env("ADM_OAUTH2_CLIENT_ID"),
Expand Down
88 changes: 40 additions & 48 deletions lib/pigeon.ex
Expand Up @@ -25,6 +25,8 @@ defmodule Pigeon do
See `Pigeon.Adapter` for instructions.
"""

alias Pigeon.Tasks

@default_timeout 5_000

@typedoc ~S"""
Expand All @@ -46,7 +48,12 @@ defmodule Pigeon do
n = Pigeon.ADM.Notification.new("token", %{"message" => "test"})
Pigeon.ADM.push(n, on_response: handler)
"""
@type on_response :: (Notification.t() -> no_return)
@type on_response ::
(notification -> no_return)
| {module, atom}
| {module, atom, [any]}

@type notification :: %{__meta__: Pigeon.Metadata.t()}

@typedoc ~S"""
Options for sending push notifications.
Expand All @@ -60,6 +67,17 @@ defmodule Pigeon do
timeout: non_neg_integer
]

@doc """
Returns the configured default pool size for Pigeon dispatchers.
To customize this value, include the following in your config/config.exs:
config :pigeon, :default_pool_size, 5
"""
@spec default_pool_size :: pos_integer
def default_pool_size() do
Application.get_env(:pigeon, :default_pool_size, 5)
end

@doc """
Returns the configured JSON encoding library for Pigeon.
To customize the JSON library, include the following in your config/config.exs:
Expand All @@ -71,39 +89,23 @@ defmodule Pigeon do
Application.get_env(:pigeon, :json_library, Jason)
end

def debug_log?, do: Application.get_env(:pigeon, :debug_log, false)

@spec push(pid | atom, notification :: struct | [struct], push_opts) ::
{:ok, notification :: struct}
| {:error, notification :: struct}
| :ok
@spec push(pid | atom, notification :: struct, push_opts) ::
notification :: struct | no_return
@spec push(pid | atom, notifications :: [struct, ...], push_opts) ::
notifications :: [struct, ...] | no_return
def push(pid, notifications, opts \\ [])

def push(pid, notifications, opts) when is_list(notifications) do
if Keyword.has_key?(opts, :on_response) do
push_async(pid, notifications, opts[:on_response])
else
timeout = Keyword.get(opts, :timeout, @default_timeout)

notifications
|> Enum.map(&Task.async(fn -> push_sync(pid, &1, timeout) end))
|> Task.yield_many(timeout + 500)
|> Enum.map(fn {task, response} ->
case response do
nil -> Task.shutdown(task, :brutal_kill)
{:ok, resp} -> resp
_error -> nil
end
end)
end
for n <- notifications, do: push(pid, n, opts)
end

def push(pid, notification, opts) do
timeout = Keyword.get(opts, :timeout, @default_timeout)

if Keyword.has_key?(opts, :on_response) do
push_async(pid, notification, opts[:on_response])
on_response = Keyword.get(opts, :on_response)
notification = put_on_response(notification, on_response)
push_async(pid, notification)
else
timeout = Keyword.get(opts, :timeout, @default_timeout)
push_sync(pid, notification, timeout)
end
end
Expand All @@ -112,8 +114,9 @@ defmodule Pigeon do
myself = self()
ref = :erlang.make_ref()
on_response = fn x -> send(myself, {:"$push", ref, x}) end
notification = put_on_response(notification, on_response)

push_async(pid, notification, on_response)
push_async(pid, notification)

receive do
{:"$push", ^ref, x} -> x
Expand All @@ -122,30 +125,19 @@ defmodule Pigeon do
end
end

defp push_async(pid, notifications, on_response)
when is_list(notifications) do
for n <- notifications, do: push_async(pid, n, on_response)
end

defp push_async(pid, notification, nil) do
GenServer.cast(pid, {:push, notification, nil})
end
defp push_async(pid, notification) do
case Pigeon.Registry.next(pid) do
nil ->
Tasks.process_on_response(%{notification | response: :not_started})

defp push_async(pid, notification, on_response) when is_pid(pid) do
if Process.alive?(pid) do
GenServer.cast(pid, {:push, notification, on_response})
else
on_response.(%{notification | response: :not_started})
pid ->
send(pid, {:"$push", notification})
:ok
end
end

defp push_async(pid, notification, on_response) do
case Process.whereis(pid) do
nil ->
on_response.(%{notification | response: :not_started})

_pid ->
GenServer.cast(pid, {:push, notification, on_response})
end
defp put_on_response(notification, on_response) do
meta = %{notification.__meta__ | on_response: on_response}
%{notification | __meta__: meta}
end
end
12 changes: 4 additions & 8 deletions lib/pigeon/adapter.ex
Expand Up @@ -9,7 +9,7 @@ defmodule Pigeon.Adapter do
```
defmodule Pigeon.Sandbox do
import Pigeon.Tasks, only: [process_on_response: 2]
import Pigeon.Tasks, only: [process_on_response: 1]
@behaviour Pigeon.Adapter
Expand Down Expand Up @@ -41,9 +41,9 @@ defmodule Pigeon.Adapter do
Invoked when the server is started.
Return value should be `{:ok, state}` for the `Pigeon.Dispatcher` state,
or `{:error, atom}` if started with invalid configuration options.
or `{:stop, atom}` if started with invalid configuration options.
"""
@callback init(opts :: Keyword.t()) :: {:ok, term} | {:error, atom}
@callback init(opts :: Keyword.t()) :: {:ok, any} | {:stop, any}

@doc """
Invoked to handle all other messages.
Expand All @@ -53,11 +53,7 @@ defmodule Pigeon.Adapter do
@doc """
Invoked to handle push notifications.
"""
@callback handle_push(
notification :: struct | [struct],
on_response :: Pigeon.on_response(),
state :: term
) ::
@callback handle_push(notification :: struct | [struct], state :: term) ::
{:noreply, new_state :: term}
| {:stop, reason :: term, new_state :: term}
end
78 changes: 35 additions & 43 deletions lib/pigeon/adm.ex
Expand Up @@ -140,7 +140,7 @@ defmodule Pigeon.ADM do

@behaviour Pigeon.Adapter

import Pigeon.Tasks, only: [process_on_response: 2]
import Pigeon.Tasks, only: [process_on_response: 1]
alias Pigeon.ADM.{Config, ResultParser}
require Logger

Expand All @@ -167,15 +167,17 @@ defmodule Pigeon.ADM do
end

@impl true
def handle_push(notification, on_response, state) do
def handle_push(notification, state) do
case refresh_access_token_if_needed(state) do
{:ok, state} ->
:ok = do_push(notification, state, on_response)
:ok = do_push(notification, state)
{:noreply, state}

{:error, reason} ->
notification = %{notification | response: reason}
process_on_response(on_response, notification)
notification
|> Map.put(:response, reason)
|> process_on_response()

{:noreply, state}
end
end
Expand Down Expand Up @@ -281,29 +283,21 @@ defmodule Pigeon.ADM do
[{"Content-Type", "application/x-www-form-urlencoded;charset=UTF-8"}]
end

defp do_push(notification, state, on_response) do
defp do_push(notification, state) do
request = {notification.registration_id, encode_payload(notification)}

response =
case on_response do
nil ->
fn {reg_id, payload} ->
HTTPoison.post(adm_uri(reg_id), payload, adm_headers(state))
end

_ ->
fn {reg_id, payload} ->
case HTTPoison.post(adm_uri(reg_id), payload, adm_headers(state)) do
{:ok, %HTTPoison.Response{status_code: status, body: body}} ->
notification = %{notification | registration_id: reg_id}
process_response(status, body, notification, on_response)

{:error, %HTTPoison.Error{reason: :connect_timeout}} ->
notification = %{notification | response: :timeout}
process_on_response(on_response, notification)
end
end
response = fn {reg_id, payload} ->
case HTTPoison.post(adm_uri(reg_id), payload, adm_headers(state)) do
{:ok, %HTTPoison.Response{status_code: status, body: body}} ->
notification = %{notification | registration_id: reg_id}
process_response(status, body, notification)

{:error, %HTTPoison.Error{reason: :connect_timeout}} ->
notification
|> Map.put(:response, :timeout)
|> process_on_response()
end
end

Task.Supervisor.start_child(Pigeon.Tasks, fn -> response.(request) end)
:ok
Expand Down Expand Up @@ -349,38 +343,36 @@ defmodule Pigeon.ADM do
payload |> Map.put("md5", md5)
end

defp process_response(200, body, notification, on_response),
do: handle_200_status(body, notification, on_response)
defp process_response(200, body, notification),
do: handle_200_status(body, notification)

defp process_response(status, body, notification, on_response),
do: handle_error_status_code(status, body, notification, on_response)
defp process_response(status, body, notification),
do: handle_error_status_code(status, body, notification)

defp handle_200_status(body, notification, on_response) do
defp handle_200_status(body, notification) do
{:ok, json} = Pigeon.json_library().decode(body)
parse_result(notification, json, on_response)

notification
|> ResultParser.parse(json)
|> process_on_response()
end

defp handle_error_status_code(status, body, notification, on_response) do
defp handle_error_status_code(status, body, notification) do
case Pigeon.json_library().decode(body) do
{:ok, %{"reason" => _reason} = result_json} ->
parse_result(notification, result_json, on_response)
notification
|> ResultParser.parse(result_json)
|> process_on_response()

{:error, _} ->
n = %{notification | response: generic_error_reason(status)}
process_on_response(on_response, n)
notification
|> Map.put(:response, generic_error_reason(status))
|> process_on_response()
end
end

defp generic_error_reason(400), do: :invalid_json
defp generic_error_reason(401), do: :authentication_error
defp generic_error_reason(500), do: :internal_server_error
defp generic_error_reason(_), do: :unknown_error

# no on_response callback, ignore
@doc false
def parse_result(_, _, nil), do: :ok

def parse_result(notification, response, on_response) do
ResultParser.parse(notification, response, on_response)
end
end
3 changes: 2 additions & 1 deletion lib/pigeon/adm/notification.ex
Expand Up @@ -3,7 +3,8 @@ defmodule Pigeon.ADM.Notification do
Defines Amazon ADM notification struct and convenience constructor functions.
"""

defstruct consolidation_key: nil,
defstruct __meta__: %Pigeon.Metadata{},
consolidation_key: nil,
expires_after: 604_800,
md5: nil,
payload: %{},
Expand Down

0 comments on commit 6bf635d

Please sign in to comment.