Skip to content

Commit

Permalink
Merge pull request #39 from esl/allocate-reserved-relay-address
Browse files Browse the repository at this point in the history
Allocate reserved relay address
  • Loading branch information
arkgil committed Apr 28, 2017
2 parents 5033fdd + 562c3db commit edeba11
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 68 deletions.
3 changes: 2 additions & 1 deletion lib/fennec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ defmodule Fennec do
@type ip :: :inet.ip_address
@type portn :: :inet.port_number

@type client_info :: %{ip: Fennec.ip, port: Fennec.portn}
@type client_info :: %{socket: Fennec.UDP.socket,
ip: Fennec.ip, port: Fennec.portn}
end
2 changes: 1 addition & 1 deletion lib/fennec/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Fennec.Application do

def start(_type, _args) do
opts = [strategy: :one_for_one, name: Fennec.Supervisor]
Supervisor.start_link(servers(), opts)
Supervisor.start_link([Fennec.ReservationLog.child_spec()] ++ servers(), opts)
end

defp servers do
Expand Down
156 changes: 122 additions & 34 deletions lib/fennec/evaluator/allocate/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ defmodule Fennec.Evaluator.Allocate.Request do
alias Jerboa.Format.Body.Attribute.ErrorCode
alias Jerboa.Params
alias Fennec.TURN
alias Fennec.TURN.Reservation

require Integer
require Logger

@even_port_max_retries 100
@create_relays_max_retries 100

@spec service(Params.t, Fennec.client_info, Fennec.UDP.server_opts, TURN.t)
:: {Params.t, TURN.t}
def service(params, client, server, turn_state) do
request_status =
{:continue, params, %{even_port: nil}}
{:continue, params, %{}}
|> maybe(&verify_existing_allocation/5, [client, server, turn_state])
|> maybe(&verify_requested_transport/2)
|> maybe(&verify_dont_fragment/2)
Expand All @@ -36,12 +38,16 @@ defmodule Fennec.Evaluator.Allocate.Request do
end

defp allocation_params(params, %{ip: a, port: p}, server,
turn_state = %TURN{allocation: allocation}) do
turn_state = %TURN{allocation: allocation},
reservation_token) do
%TURN.Allocation{socket: socket, expire_at: expire_at} = allocation
{:ok, {socket_addr, port}} = :inet.sockname(socket)
addr = server[:relay_ip] || socket_addr
lifetime = max(0, expire_at - Fennec.Time.system_time(:second))
attrs = [
attrs = case reservation_token do
:not_requested -> []
%Attribute.ReservationToken{} -> [reservation_token]
end ++ [
%Attribute.XORMappedAddress{
family: family(a),
address: a,
Expand All @@ -60,39 +66,125 @@ defmodule Fennec.Evaluator.Allocate.Request do
end

defp allocate(params, state, client, server, turn_state) do
{:ok, socket} = create_relay(state, server)
allocation = %Fennec.TURN.Allocation{
socket: socket,
expire_at: Fennec.Time.system_time(:second) + TURN.Allocation.default_lifetime(),
req_id: Params.get_id(params),
owner_username: owner_username(params)
}

new_turn_state = %{turn_state | allocation: allocation}
{:respond, allocation_params(params, client, server, new_turn_state)}
end

defp create_relay( state, server, retries \\ @even_port_max_retries)
defp create_relay(_state, _server, retries)
when retries < 0, do: {:error, :even_port_max_retries}
defp create_relay( state, server, retries) do
addr = server[:relay_ip]
## TODO: {:active, true} is not an option for a production system!
{:ok, socket} = :gen_udp.open(0, [:binary, active: true, ip: addr])
{:ok, {_, port}} = :inet.sockname(socket)
if state.even_port != nil and not Integer.is_even(port) do
:gen_udp.close(socket)
create_relay(state, server, retries - 1)
else
{:ok, socket}
case create_relays(params, state, server) do
{:error, error_code} ->
{:error, error_code}
{:ok, socket, reservation_token} ->
allocation = %Fennec.TURN.Allocation{
socket: socket,
expire_at: Fennec.Time.system_time(:second) + TURN.Allocation.default_lifetime(),
req_id: Params.get_id(params),
owner_username: owner_username(params)
}
new_turn_state = %{turn_state | allocation: allocation}
{:respond, allocation_params(params, client, server, new_turn_state,
reservation_token)}
end
end

defp create_relays(params, state, server) do
status =
{:continue, params, create_relays_state(state)}
|> maybe(&requests_reserved_port?/2)
|> maybe(&open_this_relay/3, [server])
|> maybe(&reserve_another_relay/3, [server])
case status do
{:respond, state} ->
{:ok, state.this_socket, state.new_reservation_token}
{:continue, _params, state} ->
{:ok, state.this_socket, state.new_reservation_token}
{:error, error_code} ->
{:error, error_code}
end
end

defp create_relays_state(allocate_state) do
%{this_socket: nil,
this_port: nil,
new_reservation_token: :not_requested,
retries: Map.get(allocate_state, :retries) || @create_relays_max_retries}
end

defp requests_reserved_port?(params, state) do
case Params.get_attr(params, Attribute.ReservationToken) do
nil -> {:continue, params, state}
%Attribute.ReservationToken{} = token ->
case Fennec.ReservationLog.take(token) do
nil ->
Logger.info fn -> "no reservation: #{inspect(token)}" end
{:error, ErrorCode.new(:insufficient_capacity)}
%Reservation{} = r ->
{:respond, %{state | this_socket: r.socket}}
end
end
end

defp open_this_relay(_params, %{retries: r}, _server) when r < 0 do
Logger.warn(:max_retries)
{:error, ErrorCode.new(:insufficient_capacity)}
end
defp open_this_relay(params, state, server) do
opts = udp_opts(server)
case {Params.get_attr(params, Attribute.EvenPort),
:gen_udp.open(0, opts)} do
{nil, {:ok, socket}} ->
{:continue, params, %{state | this_socket: socket}}
{%Attribute.EvenPort{}, {:ok, socket}} ->
{:ok, {_, port}} = :inet.sockname(socket)
if Integer.is_even(port) do
{:continue, params,
%{state | this_socket: socket, this_port: port}}
else
:gen_udp.close(socket)
new_state = %{state | retries: state.retries - 1}
open_this_relay(params, new_state, server)
end
{_, {:error, reason}} ->
Logger.warn(":gen_udp.open/2 error: #{reason}, port: 0, opts: #{opts}")
{:error, ErrorCode.new(:insufficient_capacity)}
end
end

defp reserve_another_relay(params, state, server) do
case Params.get_attr(params, Attribute.EvenPort) do
nil -> {:continue, params, state}
%Attribute.EvenPort{reserved?: false} -> {:continue, params, state}
%Attribute.EvenPort{reserved?: true} ->
port = state.this_port + 1
opts = udp_opts(server)
case :gen_udp.open(port, opts) do
{:error, :eaddrinuse} ->
## We can't allocate a pair of consecutive ports.
## We're jumping back to before we opened state.this_socket!
:gen_udp.close(state.this_socket)
create_relays(params, %{retries: state.retries - 1}, server)
{:error, reason} ->
Logger.warn(":gen_udp.open/2 error: #{reason}, port: #{port}, opts: #{opts}")
{:error, ErrorCode.new(:insufficient_capacity)}
{:ok, socket} ->
token = do_reserve_another_relay(socket)
{:continue, params, %{state | new_reservation_token: token}}
end
end
end

defp do_reserve_another_relay(socket) do
r = Reservation.new(socket)
:ok = Fennec.ReservationLog.register(r)
## TODO: expire the reservation!
r.token
end

defp udp_opts(server) do
## TODO: {:active, true} is not an option for a production system!
[:binary, active: true, ip: server[:relay_ip]]
end

defp verify_existing_allocation(params, state, client, server, turn_state) do
req_id = Params.get_id(params)
case turn_state do
%TURN{allocation: %TURN.Allocation{req_id: ^req_id}} ->
{:respond, allocation_params(params, client, server, turn_state)}
{:respond, allocation_params(params, client, server, turn_state, :not_requested)}
%TURN{allocation: %TURN.Allocation{}} ->
{:error, ErrorCode.new(:allocation_mismatch)}
%TURN{allocation: nil} ->
Expand Down Expand Up @@ -125,8 +217,6 @@ defmodule Fennec.Evaluator.Allocate.Request do
case Params.get_attr(params, Attribute.ReservationToken) do
%Attribute.ReservationToken{} when even_port != nil ->
{:error, ErrorCode.new(:bad_request)}
%Attribute.ReservationToken{} ->
{:error, ErrorCode.new(:unknown_attribute)} # Currently unsupported
_ ->
{:continue, params, state}
end
Expand All @@ -137,8 +227,6 @@ defmodule Fennec.Evaluator.Allocate.Request do
case Params.get_attr(params, Attribute.EvenPort) do
%Attribute.EvenPort{} when reservation_token != nil ->
{:error, ErrorCode.new(:bad_request)}
%Attribute.EvenPort{} = ep ->
{:continue, params, %{state | even_port: ep}}
_ ->
{:continue, params, state}
end
Expand Down
40 changes: 40 additions & 0 deletions lib/fennec/reservation_log.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Fennec.ReservationLog do
@moduledoc false

## Runtime support for storing and fetching pending reservations.
## I.e. an ETS table owner process.

alias Fennec.TURN.Reservation
alias Jerboa.Format.Body.Attribute.ReservationToken

def start_link() do
Agent.start_link(fn -> init_db(__MODULE__) end, name: __MODULE__)
end

def child_spec() do
Supervisor.Spec.worker(Fennec.ReservationLog, [])
end

@spec register(Reservation.t) :: :ok | {:error, :exists}
def register(%Reservation{} = r) do
case :ets.insert_new(__MODULE__, Reservation.to_tuple(r)) do
false -> {:error, :exists}
_ -> :ok
end
end

@spec take(ReservationToken.t) :: Reservation.t | nil
def take(%ReservationToken{} = token) do
case :ets.take(__MODULE__, token.value) do
[] -> nil
[r] -> Reservation.from_tuple(r)
end
end

defp init_db(table_name) do
## TODO: largely guesswork here, not load tested
perf_opts = [write_concurrency: true]
^table_name = :ets.new(table_name, [:public, :named_table] ++ perf_opts)
end

end
41 changes: 41 additions & 0 deletions lib/fennec/turn/reservation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Fennec.TURN.Reservation do
@moduledoc false

## A Reservation represents a relay address reserved
## by an allocation request with a positive `EvenPort.reserved?`.
## The relay address is created along with a RESERVATION-TOKEN
## which is returned to the client requesting the allocation,
## and is stored until another allocation request with the same
## reservation token is sent by the client.
## The Reservation is then turned into a full-blown Allocation.
## This mechanism is used to allocate consecutive port pairs,
## for example for RTP and RTCP transmissions.

alias Jerboa.Format.Body.Attribute.ReservationToken

defstruct [:token, :socket]

@type t :: %__MODULE__{
token: ReservationToken.t,
socket: Fennec.UDP.socket
}

@spec new(Fennec.UDP.socket) :: t
def new(socket) do
%__MODULE__{token: ReservationToken.new(),
socket: socket}
end

## Only intended for storing in ETS
def to_tuple(%__MODULE__{} = r) do
%__MODULE__{token: %ReservationToken{value: token}} = r
{token, r.socket}
end

## Only intended for storing in ETS
def from_tuple({token, socket}) when is_binary(token) and is_port(socket) do
%__MODULE__{token: %ReservationToken{value: token},
socket: socket}
end

end
15 changes: 8 additions & 7 deletions lib/fennec/udp/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ defmodule Fennec.UDP.Dispatcher do

# Dispatches data to worker associated with client's
# server-reflexive IP and port number
@spec dispatch(atom, atom, UDP.socket, Fennec.ip, Fennec.portn, binary) :: term
def dispatch(dispatcher, worker_sup, socket, ip, port, data) do
case find_or_start_worker(dispatcher, worker_sup, socket, ip, port) do
@spec dispatch(atom, atom, Fennec.client_info, binary) :: term
def dispatch(dispatcher, worker_sup, client, data) do
case find_or_start_worker(dispatcher, worker_sup, client) do
{:ok, pid} ->
Worker.process_data(pid, data)
_ ->
Expand All @@ -39,16 +39,17 @@ defmodule Fennec.UDP.Dispatcher do
Registry.lookup(dispatcher, key(ip, port))
end

defp find_or_start_worker(dispatcher, worker_sup, socket, ip, port) do
defp find_or_start_worker(dispatcher, worker_sup, client) do
%{ip: ip, port: port} = client
case lookup_worker(dispatcher, ip, port) do
[{_owner, pid}] -> {:ok, pid}
[] ->
start_worker(worker_sup, socket, ip, port)
start_worker(worker_sup, client)
end
end

defp start_worker(worker_sup, socket, ip, port) do
case Worker.start(worker_sup, socket, ip, port) do
defp start_worker(worker_sup, client) do
case Worker.start(worker_sup, client) do
{:ok, pid} ->
{:ok, pid}
_ ->
Expand Down
5 changes: 3 additions & 2 deletions lib/fennec/udp/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ defmodule Fennec.UDP.Receiver do
end

def handle_info({:udp, socket, ip, port, data}, %{socket: socket} = state) do
_ = Fennec.UDP.Dispatcher.dispatch(state.dispatcher, state.worker_sup,
state.socket, ip, port, data)
## TODO: refactor to a proper struct?
client = %{socket: socket, ip: ip, port: port}
_ = Fennec.UDP.Dispatcher.dispatch(state.dispatcher, state.worker_sup, client, data)
{:noreply, state}
end
end

0 comments on commit edeba11

Please sign in to comment.