diff --git a/lib/fennec.ex b/lib/fennec.ex index b12364f..bd7c994 100644 --- a/lib/fennec.ex +++ b/lib/fennec.ex @@ -19,5 +19,6 @@ defmodule Fennec do @type ip :: :inet.ip_address @type portn :: :inet.port_number - @type client_info :: %{ip: Fennec.ip, port: Fennec.portn} + @type client_info :: %{socket: Fennec.UDP.socket, + ip: Fennec.ip, port: Fennec.portn} end diff --git a/lib/fennec/application.ex b/lib/fennec/application.ex index 68866cf..b6e2a6d 100644 --- a/lib/fennec/application.ex +++ b/lib/fennec/application.ex @@ -5,7 +5,7 @@ defmodule Fennec.Application do def start(_type, _args) do opts = [strategy: :one_for_one, name: Fennec.Supervisor] - Supervisor.start_link(servers(), opts) + Supervisor.start_link([Fennec.ReservationLog.child_spec()] ++ servers(), opts) end defp servers do diff --git a/lib/fennec/evaluator/allocate/request.ex b/lib/fennec/evaluator/allocate/request.ex index d290275..75bf6fd 100644 --- a/lib/fennec/evaluator/allocate/request.ex +++ b/lib/fennec/evaluator/allocate/request.ex @@ -10,16 +10,18 @@ defmodule Fennec.Evaluator.Allocate.Request do alias Jerboa.Format.Body.Attribute.ErrorCode alias Jerboa.Params alias Fennec.TURN + alias Fennec.TURN.Reservation require Integer + require Logger - @even_port_max_retries 100 + @create_relays_max_retries 100 @spec service(Params.t, Fennec.client_info, Fennec.UDP.server_opts, TURN.t) :: {Params.t, TURN.t} def service(params, client, server, turn_state) do request_status = - {:continue, params, %{even_port: nil}} + {:continue, params, %{}} |> maybe(&verify_existing_allocation/5, [client, server, turn_state]) |> maybe(&verify_requested_transport/2) |> maybe(&verify_dont_fragment/2) @@ -36,12 +38,16 @@ defmodule Fennec.Evaluator.Allocate.Request do end defp allocation_params(params, %{ip: a, port: p}, server, - turn_state = %TURN{allocation: allocation}) do + turn_state = %TURN{allocation: allocation}, + reservation_token) do %TURN.Allocation{socket: socket, expire_at: expire_at} = allocation {:ok, {socket_addr, port}} = :inet.sockname(socket) addr = server[:relay_ip] || socket_addr lifetime = max(0, expire_at - Fennec.Time.system_time(:second)) - attrs = [ + attrs = case reservation_token do + :not_requested -> [] + %Attribute.ReservationToken{} -> [reservation_token] + end ++ [ %Attribute.XORMappedAddress{ family: family(a), address: a, @@ -60,39 +66,125 @@ defmodule Fennec.Evaluator.Allocate.Request do end defp allocate(params, state, client, server, turn_state) do - {:ok, socket} = create_relay(state, server) - allocation = %Fennec.TURN.Allocation{ - socket: socket, - expire_at: Fennec.Time.system_time(:second) + TURN.Allocation.default_lifetime(), - req_id: Params.get_id(params), - owner_username: owner_username(params) - } - - new_turn_state = %{turn_state | allocation: allocation} - {:respond, allocation_params(params, client, server, new_turn_state)} - end - - defp create_relay( state, server, retries \\ @even_port_max_retries) - defp create_relay(_state, _server, retries) - when retries < 0, do: {:error, :even_port_max_retries} - defp create_relay( state, server, retries) do - addr = server[:relay_ip] - ## TODO: {:active, true} is not an option for a production system! - {:ok, socket} = :gen_udp.open(0, [:binary, active: true, ip: addr]) - {:ok, {_, port}} = :inet.sockname(socket) - if state.even_port != nil and not Integer.is_even(port) do - :gen_udp.close(socket) - create_relay(state, server, retries - 1) - else - {:ok, socket} + case create_relays(params, state, server) do + {:error, error_code} -> + {:error, error_code} + {:ok, socket, reservation_token} -> + allocation = %Fennec.TURN.Allocation{ + socket: socket, + expire_at: Fennec.Time.system_time(:second) + TURN.Allocation.default_lifetime(), + req_id: Params.get_id(params), + owner_username: owner_username(params) + } + new_turn_state = %{turn_state | allocation: allocation} + {:respond, allocation_params(params, client, server, new_turn_state, + reservation_token)} + end + end + + defp create_relays(params, state, server) do + status = + {:continue, params, create_relays_state(state)} + |> maybe(&requests_reserved_port?/2) + |> maybe(&open_this_relay/3, [server]) + |> maybe(&reserve_another_relay/3, [server]) + case status do + {:respond, state} -> + {:ok, state.this_socket, state.new_reservation_token} + {:continue, _params, state} -> + {:ok, state.this_socket, state.new_reservation_token} + {:error, error_code} -> + {:error, error_code} + end + end + + defp create_relays_state(allocate_state) do + %{this_socket: nil, + this_port: nil, + new_reservation_token: :not_requested, + retries: Map.get(allocate_state, :retries) || @create_relays_max_retries} + end + + defp requests_reserved_port?(params, state) do + case Params.get_attr(params, Attribute.ReservationToken) do + nil -> {:continue, params, state} + %Attribute.ReservationToken{} = token -> + case Fennec.ReservationLog.take(token) do + nil -> + Logger.info fn -> "no reservation: #{inspect(token)}" end + {:error, ErrorCode.new(:insufficient_capacity)} + %Reservation{} = r -> + {:respond, %{state | this_socket: r.socket}} + end + end + end + + defp open_this_relay(_params, %{retries: r}, _server) when r < 0 do + Logger.warn(:max_retries) + {:error, ErrorCode.new(:insufficient_capacity)} + end + defp open_this_relay(params, state, server) do + opts = udp_opts(server) + case {Params.get_attr(params, Attribute.EvenPort), + :gen_udp.open(0, opts)} do + {nil, {:ok, socket}} -> + {:continue, params, %{state | this_socket: socket}} + {%Attribute.EvenPort{}, {:ok, socket}} -> + {:ok, {_, port}} = :inet.sockname(socket) + if Integer.is_even(port) do + {:continue, params, + %{state | this_socket: socket, this_port: port}} + else + :gen_udp.close(socket) + new_state = %{state | retries: state.retries - 1} + open_this_relay(params, new_state, server) + end + {_, {:error, reason}} -> + Logger.warn(":gen_udp.open/2 error: #{reason}, port: 0, opts: #{opts}") + {:error, ErrorCode.new(:insufficient_capacity)} + end + end + + defp reserve_another_relay(params, state, server) do + case Params.get_attr(params, Attribute.EvenPort) do + nil -> {:continue, params, state} + %Attribute.EvenPort{reserved?: false} -> {:continue, params, state} + %Attribute.EvenPort{reserved?: true} -> + port = state.this_port + 1 + opts = udp_opts(server) + case :gen_udp.open(port, opts) do + {:error, :eaddrinuse} -> + ## We can't allocate a pair of consecutive ports. + ## We're jumping back to before we opened state.this_socket! + :gen_udp.close(state.this_socket) + create_relays(params, %{retries: state.retries - 1}, server) + {:error, reason} -> + Logger.warn(":gen_udp.open/2 error: #{reason}, port: #{port}, opts: #{opts}") + {:error, ErrorCode.new(:insufficient_capacity)} + {:ok, socket} -> + token = do_reserve_another_relay(socket) + {:continue, params, %{state | new_reservation_token: token}} + end end end + defp do_reserve_another_relay(socket) do + r = Reservation.new(socket) + :ok = Fennec.ReservationLog.register(r) + ## TODO: expire the reservation! + r.token + end + + defp udp_opts(server) do + ## TODO: {:active, true} is not an option for a production system! + [:binary, active: true, ip: server[:relay_ip]] + end + defp verify_existing_allocation(params, state, client, server, turn_state) do req_id = Params.get_id(params) case turn_state do %TURN{allocation: %TURN.Allocation{req_id: ^req_id}} -> - {:respond, allocation_params(params, client, server, turn_state)} + {:respond, allocation_params(params, client, server, turn_state, :not_requested)} %TURN{allocation: %TURN.Allocation{}} -> {:error, ErrorCode.new(:allocation_mismatch)} %TURN{allocation: nil} -> @@ -125,8 +217,6 @@ defmodule Fennec.Evaluator.Allocate.Request do case Params.get_attr(params, Attribute.ReservationToken) do %Attribute.ReservationToken{} when even_port != nil -> {:error, ErrorCode.new(:bad_request)} - %Attribute.ReservationToken{} -> - {:error, ErrorCode.new(:unknown_attribute)} # Currently unsupported _ -> {:continue, params, state} end @@ -137,8 +227,6 @@ defmodule Fennec.Evaluator.Allocate.Request do case Params.get_attr(params, Attribute.EvenPort) do %Attribute.EvenPort{} when reservation_token != nil -> {:error, ErrorCode.new(:bad_request)} - %Attribute.EvenPort{} = ep -> - {:continue, params, %{state | even_port: ep}} _ -> {:continue, params, state} end diff --git a/lib/fennec/reservation_log.ex b/lib/fennec/reservation_log.ex new file mode 100644 index 0000000..63a6128 --- /dev/null +++ b/lib/fennec/reservation_log.ex @@ -0,0 +1,40 @@ +defmodule Fennec.ReservationLog do + @moduledoc false + + ## Runtime support for storing and fetching pending reservations. + ## I.e. an ETS table owner process. + + alias Fennec.TURN.Reservation + alias Jerboa.Format.Body.Attribute.ReservationToken + + def start_link() do + Agent.start_link(fn -> init_db(__MODULE__) end, name: __MODULE__) + end + + def child_spec() do + Supervisor.Spec.worker(Fennec.ReservationLog, []) + end + + @spec register(Reservation.t) :: :ok | {:error, :exists} + def register(%Reservation{} = r) do + case :ets.insert_new(__MODULE__, Reservation.to_tuple(r)) do + false -> {:error, :exists} + _ -> :ok + end + end + + @spec take(ReservationToken.t) :: Reservation.t | nil + def take(%ReservationToken{} = token) do + case :ets.take(__MODULE__, token.value) do + [] -> nil + [r] -> Reservation.from_tuple(r) + end + end + + defp init_db(table_name) do + ## TODO: largely guesswork here, not load tested + perf_opts = [write_concurrency: true] + ^table_name = :ets.new(table_name, [:public, :named_table] ++ perf_opts) + end + +end diff --git a/lib/fennec/turn/reservation.ex b/lib/fennec/turn/reservation.ex new file mode 100644 index 0000000..e17bb42 --- /dev/null +++ b/lib/fennec/turn/reservation.ex @@ -0,0 +1,41 @@ +defmodule Fennec.TURN.Reservation do + @moduledoc false + + ## A Reservation represents a relay address reserved + ## by an allocation request with a positive `EvenPort.reserved?`. + ## The relay address is created along with a RESERVATION-TOKEN + ## which is returned to the client requesting the allocation, + ## and is stored until another allocation request with the same + ## reservation token is sent by the client. + ## The Reservation is then turned into a full-blown Allocation. + ## This mechanism is used to allocate consecutive port pairs, + ## for example for RTP and RTCP transmissions. + + alias Jerboa.Format.Body.Attribute.ReservationToken + + defstruct [:token, :socket] + + @type t :: %__MODULE__{ + token: ReservationToken.t, + socket: Fennec.UDP.socket + } + + @spec new(Fennec.UDP.socket) :: t + def new(socket) do + %__MODULE__{token: ReservationToken.new(), + socket: socket} + end + + ## Only intended for storing in ETS + def to_tuple(%__MODULE__{} = r) do + %__MODULE__{token: %ReservationToken{value: token}} = r + {token, r.socket} + end + + ## Only intended for storing in ETS + def from_tuple({token, socket}) when is_binary(token) and is_port(socket) do + %__MODULE__{token: %ReservationToken{value: token}, + socket: socket} + end + +end diff --git a/lib/fennec/udp/dispatcher.ex b/lib/fennec/udp/dispatcher.ex index f069385..6eae9fb 100644 --- a/lib/fennec/udp/dispatcher.ex +++ b/lib/fennec/udp/dispatcher.ex @@ -13,9 +13,9 @@ defmodule Fennec.UDP.Dispatcher do # Dispatches data to worker associated with client's # server-reflexive IP and port number - @spec dispatch(atom, atom, UDP.socket, Fennec.ip, Fennec.portn, binary) :: term - def dispatch(dispatcher, worker_sup, socket, ip, port, data) do - case find_or_start_worker(dispatcher, worker_sup, socket, ip, port) do + @spec dispatch(atom, atom, Fennec.client_info, binary) :: term + def dispatch(dispatcher, worker_sup, client, data) do + case find_or_start_worker(dispatcher, worker_sup, client) do {:ok, pid} -> Worker.process_data(pid, data) _ -> @@ -39,16 +39,17 @@ defmodule Fennec.UDP.Dispatcher do Registry.lookup(dispatcher, key(ip, port)) end - defp find_or_start_worker(dispatcher, worker_sup, socket, ip, port) do + defp find_or_start_worker(dispatcher, worker_sup, client) do + %{ip: ip, port: port} = client case lookup_worker(dispatcher, ip, port) do [{_owner, pid}] -> {:ok, pid} [] -> - start_worker(worker_sup, socket, ip, port) + start_worker(worker_sup, client) end end - defp start_worker(worker_sup, socket, ip, port) do - case Worker.start(worker_sup, socket, ip, port) do + defp start_worker(worker_sup, client) do + case Worker.start(worker_sup, client) do {:ok, pid} -> {:ok, pid} _ -> diff --git a/lib/fennec/udp/receiver.ex b/lib/fennec/udp/receiver.ex index 324e921..7da3184 100644 --- a/lib/fennec/udp/receiver.ex +++ b/lib/fennec/udp/receiver.ex @@ -27,8 +27,9 @@ defmodule Fennec.UDP.Receiver do end def handle_info({:udp, socket, ip, port, data}, %{socket: socket} = state) do - _ = Fennec.UDP.Dispatcher.dispatch(state.dispatcher, state.worker_sup, - state.socket, ip, port, data) + ## TODO: refactor to a proper struct? + client = %{socket: socket, ip: ip, port: port} + _ = Fennec.UDP.Dispatcher.dispatch(state.dispatcher, state.worker_sup, client, data) {:noreply, state} end end diff --git a/lib/fennec/udp/worker.ex b/lib/fennec/udp/worker.ex index f2ef1b9..69dd387 100644 --- a/lib/fennec/udp/worker.ex +++ b/lib/fennec/udp/worker.ex @@ -25,9 +25,9 @@ defmodule Fennec.UDP.Worker do } # Starts a UDP worker - @spec start(atom, UDP.socket, Fennec.ip, Fennec.portn) :: {:ok, pid} | :error - def start(worker_sup, socket, ip, port) do - WorkerSupervisor.start_worker(worker_sup, socket, ip, port) + @spec start(atom, Fennec.client_info) :: {:ok, pid} | :error + def start(worker_sup, client) do + WorkerSupervisor.start_worker(worker_sup, client) end # Process UDP datagram which might be STUN message @@ -36,16 +36,15 @@ defmodule Fennec.UDP.Worker do GenServer.cast(pid, {:process_data, data}) end - def start_link(dispatcher, server_opts, socket, ip, port) do - GenServer.start_link(__MODULE__, [dispatcher, server_opts, socket, ip, port]) + def start_link(dispatcher, server_opts, client) do + GenServer.start_link(__MODULE__, [dispatcher, server_opts, client]) end ## GenServer callbacks - def init([dispatcher, server_opts, socket, ip, port]) do - _ = Dispatcher.register_worker(dispatcher, self(), ip, port) - client = %{ip: ip, port: port} - state = %{socket: socket, client: client, nonce_updated_at: 0, + def init([dispatcher, server_opts, client]) do + _ = Dispatcher.register_worker(dispatcher, self(), client.ip, client.port) + state = %{client: client, nonce_updated_at: 0, server: server_opts, turn: %TURN{}} {:ok, state, timeout(state)} end @@ -61,7 +60,7 @@ defmodule Fennec.UDP.Worker do {:ok, {:void, new_turn_state}} -> %{state | turn: new_turn_state} {:ok, {resp, new_turn_state}} -> - :ok = :gen_udp.send(state.socket, state.client.ip, + :ok = :gen_udp.send(state.client.socket, state.client.ip, state.client.port, resp) %{state | turn: new_turn_state} end @@ -92,7 +91,7 @@ defmodule Fennec.UDP.Worker do end def handle_peer_data(:allowed, ip, port, data, state) do - :ok = :gen_udp.send(state.socket, state.client.ip, state.client.port, + :ok = :gen_udp.send(state.client.socket, state.client.ip, state.client.port, Jerboa.Format.encode(data_params(ip, port, data))) state end diff --git a/lib/fennec/udp/worker_supervisor.ex b/lib/fennec/udp/worker_supervisor.ex index 4a7e676..6b1f31b 100644 --- a/lib/fennec/udp/worker_supervisor.ex +++ b/lib/fennec/udp/worker_supervisor.ex @@ -19,9 +19,9 @@ defmodule Fennec.UDP.WorkerSupervisor do end # Starts worker under WorkerSupervisor - @spec start_worker(atom, UDP.socket, Fennec.ip, Fennec.portn) :: {:ok, pid} | :error - def start_worker(worker_sup, socket, ip, port) do - case Supervisor.start_child(worker_sup, [socket, ip, port]) do + @spec start_worker(atom, Fennec.client_info) :: {:ok, pid} | :error + def start_worker(worker_sup, client) do + case Supervisor.start_child(worker_sup, [client]) do {:ok, pid} -> {:ok, pid} _ -> diff --git a/mix.lock b/mix.lock index 1e0268b..9995b71 100644 --- a/mix.lock +++ b/mix.lock @@ -9,7 +9,7 @@ "hackney": {:hex, :hackney, "1.7.1", "e238c52c5df3c3b16ce613d3a51c7220a784d734879b1e231c9babd433ac1cb4", [:rebar3], [{:certifi, "1.0.0", [hex: :certifi, optional: false]}, {:idna, "4.0.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, optional: false]}]}, "idna": {:hex, :idna, "4.0.0", "10aaa9f79d0b12cf0def53038547855b91144f1bfcc0ec73494f38bb7b9c4961", [:rebar3], []}, "inch_ex": {:hex, :inch_ex, "0.5.6", "418357418a553baa6d04eccd1b44171936817db61f4c0840112b420b8e378e67", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]}, - "jerboa": {:git, "https://github.com/esl/jerboa.git", "625276d4f605352ea7da78a0bdca1e0dba7ce2d8", []}, + "jerboa": {:git, "https://github.com/esl/jerboa.git", "10ce9603b559193e3bf537924e96648e92b70a42", []}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], []}, "meck": {:hex, :meck, "0.8.4", "59ca1cd971372aa223138efcf9b29475bde299e1953046a0c727184790ab1520", [:make, :rebar], []}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []}, diff --git a/test/fennec/udp/allocate_test.exs b/test/fennec/udp/allocate_test.exs index b958ed0..1578fc8 100644 --- a/test/fennec/udp/allocate_test.exs +++ b/test/fennec/udp/allocate_test.exs @@ -151,6 +151,30 @@ defmodule Fennec.UDP.AllocateTest do end end + test "reserves a higher port if requested" do + ## given a TURN server + addr = {127, 0, 0, 1} + ## when allocating a UDP relay address with an even port + ## and reserving the next port + udp1 = UDP.connect(addr, addr, 1) + on_exit fn -> UDP.close(udp1) end + params1 = UDP.allocate(udp1, attributes: [ + %RequestedTransport{protocol: :udp}, + %EvenPort{reserved?: true} + ]) + %XORRelayedAddress{port: relay_port1} = Params.get_attr(params1, XORRelayedAddress) + reservation_token = Params.get_attr(params1, ReservationToken) + IO.inspect(reservation_token, label: "\nreservation token") + ## then the next allocation with a RESERVATION-TOKEN + ## allocates a relay address with the reserved port + udp2 = UDP.connect(addr, addr, 1) + on_exit fn -> UDP.close(udp2) end + params2 = UDP.allocate(udp2, attributes: [reservation_token]) + %XORRelayedAddress{port: relay_port2} = Params.get_attr(params2, XORRelayedAddress) + assert Integer.is_even(relay_port1) + assert relay_port2 == relay_port1 + 1 + end + end describe "allocation" do diff --git a/test/fennec/udp/auth_template.ex b/test/fennec/udp/auth_template.ex index 4cba5bc..3ec2b65 100644 --- a/test/fennec/udp/auth_template.ex +++ b/test/fennec/udp/auth_template.ex @@ -39,7 +39,7 @@ defmodule Fennec.UDP.AuthTemplate do :allocate -> nil _ -> quote do - UDP.allocate(udp, username) + UDP.allocate(udp, username: username) UDP.create_permissions(udp, [{127,0,0,1}], username) end end diff --git a/test/fennec/udp/server_test.exs b/test/fennec/udp/server_test.exs index 5611a32..4469863 100644 --- a/test/fennec/udp/server_test.exs +++ b/test/fennec/udp/server_test.exs @@ -6,10 +6,11 @@ defmodule Fennec.UDP.ServerTest do {:ok, _} = Fennec.UDP.start(ip: {127, 0, 0, 1}, port: port) expected_name = String.to_atom(~s"Elixir.Fennec.UDP.#{port}") - assert [{^expected_name, _, _, _}] = - Supervisor.which_children(Fennec.Supervisor) + assert [{Fennec.ReservationLog, _, _, _}, {^expected_name, _, _, _}] = + Enum.sort(Supervisor.which_children(Fennec.Supervisor)) assert :ok = Fennec.UDP.stop(port) - assert [] = Supervisor.which_children(Fennec.Supervisor) + assert [{Fennec.ReservationLog, _, _, _}] = + Supervisor.which_children(Fennec.Supervisor) end test "start/1 allows to start multiple servers on different ports" do diff --git a/test/helper/udp.ex b/test/helper/udp.ex index 253bc6d..4a6f7c4 100644 --- a/test/helper/udp.ex +++ b/test/helper/udp.ex @@ -75,13 +75,16 @@ defmodule Helper.UDP do ## UDP Client - def allocate(udp, username \\ @default_user, client_id \\ 0) do + def allocate(udp, opts \\ []) do + opts = Keyword.merge([username: @default_user, + client_id: 0, + attributes: []], opts) id = Params.generate_id() - req = allocate_request(id, [ + req = allocate_request(id, opts[:attributes] ++ [ %RequestedTransport{protocol: :udp}, - %Username{value: username} + %Username{value: opts[:username]} ]) - resp = no_auth(communicate(udp, client_id, req)) + resp = no_auth(communicate(udp, opts[:client_id], req)) params = Format.decode!(resp) %Params{class: :success, method: :allocate,