Skip to content

Commit

Permalink
Merge pull request #89 from sasa1977/support-child-spec
Browse files Browse the repository at this point in the history
Support embedded mode for MC

I.e. running as a part of a supervision tree.
  • Loading branch information
savonarola committed Jan 21, 2022
2 parents 6916c01 + 7298363 commit 6caa3ca
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 61 deletions.
13 changes: 11 additions & 2 deletions EXAMPLES.md
Expand Up @@ -192,8 +192,15 @@ defmodule MC do
alias SMPPEX.Pdu
alias SMPPEX.Pdu.Factory, as: PduFactory

def start(port) do
SMPPEX.MC.start({__MODULE__, []}, [transport_opts: [port: port]])
def child_spec(port) do
Supervisor.child_spec(
{
SMPPEX.MC,
session: {__MODULE__, []},
transport_opts: [port: port]
},
[]
)
end

def init(_socket, _transport, []) do
Expand All @@ -214,3 +221,5 @@ defmodule MC do
end
```

This server can be started by providing `{MC, port}` as a child of some supervisor.

98 changes: 71 additions & 27 deletions lib/smppex/mc.ex
Expand Up @@ -17,18 +17,23 @@ defmodule SMPPEX.MC do
```
2. Start a listener passing implemented behaviour as a callback module.
2. Pass the child specification to a supervisor, using implemented behaviour as a session module:
```elixir
{:ok, listener} = SMPPEX.MC.start({MyESMESession, some_args},
transport_opts: [port: 2775])
Supervisor.start_link(
[
{
SMPPEX.MC,
session: {MyESMESession, session_arg},
transport_opts: [port: 2775]
},
...
],
...
)
```
The important things to note are:
* There is no `start_link` method, since started listener is not a standalone
`GenServer` but a pool of socket acceptors running under `Ranch` supervisor.
* Each received connection is served with its own process which uses passed callback module (`MyESMESession`) for handling connection events. Each process has his own state initialized by `init` callback receiving `socket`, `transport` and a copy of arguments (`some_args`).
Note that each received connection is served with its own process which uses passed callback module (`MyESMESession`) for handling connection events. Each process has his own state initialized by `init` callback receiving `socket`, `transport` and a copy of arguments (`session_arg`).
"""

alias :ranch, as: Ranch
Expand All @@ -43,11 +48,45 @@ defmodule SMPPEX.MC do
| {:error, reason :: term}

@doc """
Starts listener for MC entitiy.
Starts listener for MC entity.
The listener is started in the supervision tree of the `:ranch` application.
Therefore, prefer `child_spec/1`, which allows you to start the MC in your own supervision tree.
The first argument must be a `{module, arg}` tuple, where `module` is the callback module which should implement `SMPPEX.Session` behaviour, while `arg` is the argument passed to the `init` callback each time a new connection is received.
For the list of other options see `child_spec/1`.
"""
def start(mod_with_args, opts \\ []) do
{ref, transport, transport_opts, protocol, protocol_opts} =
ranch_start_args(mod_with_args, opts)

start_result = Ranch.start_listener(ref, transport, transport_opts, protocol, protocol_opts)

case start_result do
{:error, _} = error -> error
{:ok, _, _} -> {:ok, ref}
{:ok, _} -> {:ok, ref}
end
end

@doc """
Returns a supervisor child specification for starting listener for MC entity.
Starting under a supervisor:
`module` is the callback module which should implement `SMPPEX.Session` behaviour.
`args` is the argument passed to the `init` callback each time a new connection is received.
`opts` is a keyword list of different options:
```elixir
Supervisor.start_link(
[
{SMPPEX.MC, session: {MyESMESession, session_arg}, ...},
...
],
...
)
```
Options:
* `:session` (required) a `{module, arg}` tuple, where `module` is the callback module which should implement `SMPPEX.Session` behaviour, while `arg` is the argument passed to the `init` callback each time a new connection is received.
* `:transport` is Ranch transport used for TCP connections: either `ranch_tcp` (the default) or `ranch_ssl`;
* `:transport_opts` is a map of Ranch transport options. The major key is `socket_opts` which contains a list of important options such as `{:port, port}`. The port is set to `0` by default, which means that the listener will accept connections on a random free port. For backward compatibility one can pass a list of socket options instead of `transport_opts` map (as in Ranch 1.x).
* `:session_module` is a module to use as an alternative to `SMPPEX.Session` for handling sessions (if needed). For example, `SMPPEX.TelemetrySession`.
Expand Down Expand Up @@ -78,7 +117,19 @@ defmodule SMPPEX.MC do
The returned value is either `{:ok, ref}` or `{:error, reason}`. The `ref` can be later used
to stop the whole MC listener and all sessions received by it.
"""
def start({_module, _args} = mod_with_args, opts \\ []) do
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
# TODO: using fetch! + delete since pop! is supported on 1.10+. Replace this with pop! once we require at least Elixir 1.10.
mod_with_args = Keyword.fetch!(opts, :session)
opts = Keyword.delete(opts, :session)

{ref, transport, transport_opts, protocol, protocol_opts} =
ranch_start_args(mod_with_args, opts)

Ranch.child_spec(ref, transport, transport_opts, protocol, protocol_opts)
end

defp ranch_start_args({_module, _args} = mod_with_args, opts) do
acceptor_count = Keyword.get(opts, :acceptor_count, @default_acceptor_count)
transport = Keyword.get(opts, :transport, @default_transport)

Expand All @@ -92,20 +143,13 @@ defmodule SMPPEX.MC do

session_module = Keyword.get(opts, :session_module, SMPPEX.Session)

start_result =
Ranch.start_listener(
ref,
transport,
transport_opts,
SMPPEX.TransportSession,
{session_module, [mod_with_args, mc_opts]}
)

case start_result do
{:error, _} = error -> error
{:ok, _, _} -> {:ok, ref}
{:ok, _} -> {:ok, ref}
end
{
ref,
transport,
transport_opts,
SMPPEX.TransportSession,
{session_module, [mod_with_args, mc_opts]}
}
end

defp normalize_transport_opts(opts, acceptor_count) when is_list(opts) do
Expand Down
14 changes: 6 additions & 8 deletions test/esme/sync_test.exs
Expand Up @@ -30,14 +30,12 @@ defmodule SMPPEX.ESME.SyncTest do
port = Support.TCP.Helpers.find_free_port()

mc_with_opts = fn handler, opts ->
{:ok, ref} =
MC.start(
{Support.Session, {callback_agent, handler}},
transport_opts: [port: port],
mc_opts: opts
)

ref
start_supervised!({
MC,
session: {Support.Session, {callback_agent, handler}},
transport_opts: [port: port],
mc_opts: opts
})
end

mc = &mc_with_opts.(&1, mc_opts)
Expand Down
8 changes: 2 additions & 6 deletions test/integration/socket_edge_cases_test.exs
Expand Up @@ -11,7 +11,7 @@ defmodule SMPPEX.Integration.SocketEdgeCasesTest do
Process.flag(:trap_exit, true)

port = Helpers.find_free_port()
{:ok, ref} = MC.start(port, "localhost.crt", false)
start_supervised!({MC, {port, "localhost.crt", false}})
{:ok, pid} = ESME.start_link(port)

receive do
Expand All @@ -21,15 +21,13 @@ defmodule SMPPEX.Integration.SocketEdgeCasesTest do
1000 ->
assert false
end

MC.stop(ref)
end

test "socket closed before esme finishes initialization" do
Process.flag(:trap_exit, true)

port = Helpers.find_free_port()
{:ok, ref} = MC.start(port, "localhost.crt", false)
start_supervised!({MC, {port, "localhost.crt", false}})
{:ok, pid} = ESME.start_link(port, 100)

receive do
Expand All @@ -39,7 +37,5 @@ defmodule SMPPEX.Integration.SocketEdgeCasesTest do
1000 ->
assert false
end

MC.stop(ref)
end
end
7 changes: 2 additions & 5 deletions test/integration/ssl_test.exs
Expand Up @@ -10,7 +10,7 @@ defmodule SMPPEX.Integration.SSLTest do

test "pdu exchange" do
port = Helpers.find_free_port()
{:ok, ref} = MC.start(port, "localhost.crt")
start_supervised!({MC, {port, "localhost.crt"}})

{:ok, _pid} = ESME.start_link(port)

Expand All @@ -22,8 +22,6 @@ defmodule SMPPEX.Integration.SSLTest do
1000 ->
assert false
end

MC.stop(ref)
end

test "ssl handshake fail" do
Expand All @@ -32,9 +30,8 @@ defmodule SMPPEX.Integration.SSLTest do
case :string.to_integer(otp_release) do
{n, ''} when n >= 20 ->
port = Helpers.find_free_port()
{:ok, ref} = MC.start(port, "badhost.crt")
start_supervised!({MC, {port, "badhost.crt"}})
{:error, {:tls_alert, _}} = ESME.start_link(port)
MC.stop(ref)

_ ->
assert true
Expand Down
12 changes: 12 additions & 0 deletions test/mc_test.exs
Expand Up @@ -27,4 +27,16 @@ defmodule SMPPEX.MCTest do

assert :ok == MC.stop(mc_server)
end

test "child_spec" do
{:ok, pid} = Agent.start_link(fn -> [] end)
handler = fn {:init, _socket, _transport}, st -> {:ok, st} end

assert {:ok, _pid} =
start_supervised({
MC,
session: {Support.Session, {pid, handler}},
transport_opts: %{socket_opts: [port: 0]}
})
end
end
30 changes: 17 additions & 13 deletions test/support/ssl/mc.ex
Expand Up @@ -7,22 +7,26 @@ defmodule Support.SSL.MC do
alias SMPPEX.Pdu
alias SMPPEX.Pdu.Factory, as: PduFactory

def start(port, certname, accept \\ true) do
MC.start(
{__MODULE__, [accept]},
transport: :ranch_ssl,
transport_opts: %{
socket_opts: [
port: port,
certfile: 'test/support/ssl/#{certname}',
keyfile: 'test/support/ssl/cert.key'
]
}
def child_spec({port, certname}), do: child_spec({port, certname, true})

def child_spec({port, certname, accept}) do
Supervisor.child_spec(
{
MC,
session: {__MODULE__, [accept]},
transport: :ranch_ssl,
transport_opts: %{
socket_opts: [
port: port,
certfile: 'test/support/ssl/#{certname}',
keyfile: 'test/support/ssl/cert.key'
]
}
},
[]
)
end

def stop(ref), do: MC.stop(ref)

@impl true
def init(_socket, _transport, [accept]) do
if accept do
Expand Down

0 comments on commit 6caa3ca

Please sign in to comment.