Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocate reserved relay address #39

Merged
merged 23 commits into from
Apr 28, 2017
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cca1216
Refactor UDP.allocate test helper
erszcz Apr 24, 2017
d2ace5f
Test reserving a port / allocation with a RESERVATION-TOKEN
erszcz Apr 24, 2017
3cff668
Update Jerboa: ReservationToken.new is now available
erszcz Apr 27, 2017
ccee493
wip: Reserve consecutive relay port
erszcz Apr 24, 2017
5128d69
wip: do_reserve_another_relay
erszcz Apr 26, 2017
5c673e5
Extend client_info and refactor the way it's passed around
erszcz Apr 26, 2017
fc6211f
wip: Start a new worker as part of reserving the new relay address
erszcz Apr 26, 2017
1afe552
wip: Save state before backtracking
erszcz Apr 26, 2017
844154a
Introduce ReservationLog
erszcz Apr 27, 2017
3dddeeb
Store Reservations as tuples
erszcz Apr 27, 2017
2495291
Register newly allocated Reservations in the log
erszcz Apr 27, 2017
e2de158
Remove unused parameters
erszcz Apr 27, 2017
86919c9
Use more lenient lookup method
erszcz Apr 27, 2017
39d0bab
Leave TODO: expire the reservation
erszcz Apr 27, 2017
c822fe8
Clean up retval/error propagation
erszcz Apr 27, 2017
396bdea
Fix a crucial typo
erszcz Apr 27, 2017
1c9faad
Allocate reserved port if RESERVATION-TOKEN is passed
erszcz Apr 27, 2017
648c993
Handle invalid/expired reservations
erszcz Apr 27, 2017
e7cdf77
Adhere more closely to the RFC - make the test pass
erszcz Apr 27, 2017
5f5c333
Remove now-unused function
erszcz Apr 27, 2017
f91bf83
Remove code duplicated by a sloppy conflict resolution
erszcz Apr 27, 2017
4aa64aa
Fix test for the top-level supervisor children
erszcz Apr 28, 2017
562c3db
[skip ci] Fix style
erszcz Apr 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/fennec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ defmodule Fennec do
@type ip :: :inet.ip_address
@type portn :: :inet.port_number

@type client_info :: %{ip: Fennec.ip, port: Fennec.portn}
@type client_info :: %{socket: Fennec.UDP.socket,
ip: Fennec.ip, port: Fennec.portn}
end
2 changes: 1 addition & 1 deletion lib/fennec/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Fennec.Application do

def start(_type, _args) do
opts = [strategy: :one_for_one, name: Fennec.Supervisor]
Supervisor.start_link(servers(), opts)
Supervisor.start_link([Fennec.ReservationLog.child_spec()] ++ servers(), opts)
end

defp servers do
Expand Down
156 changes: 122 additions & 34 deletions lib/fennec/evaluator/allocate/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ defmodule Fennec.Evaluator.Allocate.Request do
alias Jerboa.Format.Body.Attribute.ErrorCode
alias Jerboa.Params
alias Fennec.TURN
alias Fennec.TURN.Reservation

require Integer
require Logger

@even_port_max_retries 100
@create_relays_max_retries 100

@spec service(Params.t, Fennec.client_info, Fennec.UDP.server_opts, TURN.t)
:: {Params.t, TURN.t}
def service(params, client, server, turn_state) do
request_status =
{:continue, params, %{even_port: nil}}
{:continue, params, %{}}
|> maybe(&verify_existing_allocation/5, [client, server, turn_state])
|> maybe(&verify_requested_transport/2)
|> maybe(&verify_dont_fragment/2)
Expand All @@ -36,12 +38,16 @@ defmodule Fennec.Evaluator.Allocate.Request do
end

defp allocation_params(params, %{ip: a, port: p}, server,
turn_state = %TURN{allocation: allocation}) do
turn_state = %TURN{allocation: allocation},
reservation_token) do
%TURN.Allocation{socket: socket, expire_at: expire_at} = allocation
{:ok, {socket_addr, port}} = :inet.sockname(socket)
addr = server[:relay_ip] || socket_addr
lifetime = max(0, expire_at - Fennec.Time.system_time(:second))
attrs = [
attrs = case reservation_token do
:not_requested -> []
%Attribute.ReservationToken{} -> [reservation_token]
end ++ [
%Attribute.XORMappedAddress{
family: family(a),
address: a,
Expand All @@ -60,39 +66,125 @@ defmodule Fennec.Evaluator.Allocate.Request do
end

defp allocate(params, state, client, server, turn_state) do
{:ok, socket} = create_relay(state, server)
allocation = %Fennec.TURN.Allocation{
socket: socket,
expire_at: Fennec.Time.system_time(:second) + TURN.Allocation.default_lifetime(),
req_id: Params.get_id(params),
owner_username: owner_username(params)
}

new_turn_state = %{turn_state | allocation: allocation}
{:respond, allocation_params(params, client, server, new_turn_state)}
end

defp create_relay( state, server, retries \\ @even_port_max_retries)
defp create_relay(_state, _server, retries)
when retries < 0, do: {:error, :even_port_max_retries}
defp create_relay( state, server, retries) do
addr = server[:relay_ip]
## TODO: {:active, true} is not an option for a production system!
{:ok, socket} = :gen_udp.open(0, [:binary, active: true, ip: addr])
{:ok, {_, port}} = :inet.sockname(socket)
if state.even_port != nil and not Integer.is_even(port) do
:gen_udp.close(socket)
create_relay(state, server, retries - 1)
else
{:ok, socket}
case create_relays(params, state, server) do
{:error, error_code} ->
{:error, error_code}
{:ok, socket, reservation_token} ->
allocation = %Fennec.TURN.Allocation{
socket: socket,
expire_at: Fennec.Time.system_time(:second) + TURN.Allocation.default_lifetime(),
req_id: Params.get_id(params),
owner_username: owner_username(params)
}
new_turn_state = %{turn_state | allocation: allocation}
{:respond, allocation_params(params, client, server, new_turn_state,
reservation_token)}
end
end

defp create_relays(params, state, server) do
status =
{:continue, params, create_relays_state(state)}
|> maybe(&requests_reserved_port?/2)
|> maybe(&open_this_relay/3, [server])
|> maybe(&reserve_another_relay/3, [server])
case status do
{:respond, state} ->
{:ok, state.this_socket, state.new_reservation_token}
{:continue, _params, state} ->
{:ok, state.this_socket, state.new_reservation_token}
{:error, error_code} ->
{:error, error_code}
end
end

defp create_relays_state(allocate_state) do
%{this_socket: nil,
this_port: nil,
new_reservation_token: :not_requested,
retries: Map.get(allocate_state, :retries) || @create_relays_max_retries}
end

defp requests_reserved_port?(params, state) do
case Params.get_attr(params, Attribute.ReservationToken) do
nil -> {:continue, params, state}
%Attribute.ReservationToken{} = token ->
case Fennec.ReservationLog.take(token) do
nil ->
Logger.info fn -> "no reservation: #{inspect(token)}" end
{:error, ErrorCode.new(:insufficient_capacity)}
%Reservation{} = r ->
{:respond, %{state | this_socket: r.socket}}
end
end
end

defp open_this_relay(_params, %{retries: r}, _server) when r < 0 do
Logger.warn(:max_retries)
{:error, ErrorCode.new(:insufficient_capacity)}
end
defp open_this_relay( params, state, server) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a space before first param.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It looked better when the signature was:

  defp open_this_relay(_params, %{retries: r}, _server) when r < 0, do: ...
  defp open_this_relay( params, state, server) do ...

opts = udp_opts(server)
case {Params.get_attr(params, Attribute.EvenPort),
:gen_udp.open(0, opts)} do
{nil, {:ok, socket}} ->
{:continue, params, %{state | this_socket: socket}}
{%Attribute.EvenPort{}, {:ok, socket}} ->
{:ok, {_, port}} = :inet.sockname(socket)
if Integer.is_even(port) do
{:continue, params,
%{state | this_socket: socket, this_port: port}}
else
:gen_udp.close(socket)
new_state = %{state | retries: state.retries - 1}
open_this_relay(params, new_state, server)
end
{_, {:error, reason}} ->
Logger.warn(":gen_udp.open/2 error: #{reason}, port: 0, opts: #{opts}")
{:error, ErrorCode.new(:insufficient_capacity)}
end
end

defp reserve_another_relay(params, state, server) do
case Params.get_attr(params, Attribute.EvenPort) do
nil -> {:continue, params, state}
%Attribute.EvenPort{reserved?: false} -> {:continue, params, state}
%Attribute.EvenPort{reserved?: true} ->
port = state.this_port + 1
opts = udp_opts(server)
case :gen_udp.open(port, opts) do
{:error, :eaddrinuse} ->
## We can't allocate a pair of consecutive ports.
## We're jumping back to before we opened state.this_socket!
:gen_udp.close(state.this_socket)
create_relays(params, %{retries: state.retries - 1}, server)
{:error, reason} ->
Logger.warn(":gen_udp.open/2 error: #{reason}, port: #{port}, opts: #{opts}")
{:error, ErrorCode.new(:insufficient_capacity)}
{:ok, socket} ->
token = do_reserve_another_relay(socket)
{:continue, params, %{state | new_reservation_token: token}}
end
end
end

defp do_reserve_another_relay(socket) do
r = Reservation.new(socket)
:ok = Fennec.ReservationLog.register(r)
## TODO: expire the reservation!
r.token
end

defp udp_opts(server) do
## TODO: {:active, true} is not an option for a production system!
[:binary, active: true, ip: server[:relay_ip]]
end

defp verify_existing_allocation(params, state, client, server, turn_state) do
req_id = Params.get_id(params)
case turn_state do
%TURN{allocation: %TURN.Allocation{req_id: ^req_id}} ->
{:respond, allocation_params(params, client, server, turn_state)}
{:respond, allocation_params(params, client, server, turn_state, :not_requested)}
%TURN{allocation: %TURN.Allocation{}} ->
{:error, ErrorCode.new(:allocation_mismatch)}
%TURN{allocation: nil} ->
Expand Down Expand Up @@ -125,8 +217,6 @@ defmodule Fennec.Evaluator.Allocate.Request do
case Params.get_attr(params, Attribute.ReservationToken) do
%Attribute.ReservationToken{} when even_port != nil ->
{:error, ErrorCode.new(:bad_request)}
%Attribute.ReservationToken{} ->
{:error, ErrorCode.new(:unknown_attribute)} # Currently unsupported
_ ->
{:continue, params, state}
end
Expand All @@ -137,8 +227,6 @@ defmodule Fennec.Evaluator.Allocate.Request do
case Params.get_attr(params, Attribute.EvenPort) do
%Attribute.EvenPort{} when reservation_token != nil ->
{:error, ErrorCode.new(:bad_request)}
%Attribute.EvenPort{} = ep ->
{:continue, params, %{state | even_port: ep}}
_ ->
{:continue, params, state}
end
Expand Down
40 changes: 40 additions & 0 deletions lib/fennec/reservation_log.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Fennec.ReservationLog do
@moduledoc false

## Runtime support for storing and fetching pending reservations.
## I.e. an ETS table owner process.

alias Fennec.TURN.Reservation
alias Jerboa.Format.Body.Attribute.ReservationToken

def start_link() do
Agent.start_link(fn -> init_db(__MODULE__) end, name: __MODULE__)
end

def child_spec() do
Supervisor.Spec.worker(Fennec.ReservationLog, [])
end

@spec register(Reservation.t) :: :ok | {:error, :exists}
def register(%Reservation{} = r) do
case :ets.insert_new(__MODULE__, Reservation.to_tuple(r)) do
false -> {:error, :exists}
_ -> :ok
end
end

@spec take(ReservationToken.t) :: Reservation.t | nil
def take(%ReservationToken{} = token) do
case :ets.take(__MODULE__, token.value) do
[] -> nil
[r] -> Reservation.from_tuple(r)
end
end

defp init_db(table_name) do
## TODO: largely guesswork here, not load tested
perf_opts = [write_concurrency: true]
^table_name = :ets.new(table_name, [:public, :named_table] ++ perf_opts)
end

end
43 changes: 43 additions & 0 deletions lib/fennec/turn/reservation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Fennec.TURN.Reservation do
@moduledoc false

## A Reservation represents a relay address reserved
## by an allocation request with a positive `EvenPort.reserved?`.
## The relay address is created along with a RESERVATION-TOKEN
## which is returned to the client requesting the allocation,
## and is stored until another allocation request with the same
## reservation token is sent by the client.
## The Reservation is then turned into a full-blown Allocation.
## This mechanism is used to allocate consecutive port pairs,
## for example for RTP and RTCP transmissions.

alias Jerboa.Format.Body.Attribute.ReservationToken

defstruct [:token, :socket]

@type t :: %__MODULE__{
token: ReservationToken.t,
socket: Fennec.UDP.socket
}

@spec new(Fennec.UDP.socket) :: t
def new(socket) do
%__MODULE__{token: ReservationToken.new(),
socket: socket}
end

@doc false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for @doc false if there is @moduledoc false.

## Only intended for storing in ETS
def to_tuple(%__MODULE__{} = r) do
%__MODULE__{token: %ReservationToken{value: token}} = r
{token, r.socket}
end

@doc false
## Only intended for storing in ETS
def from_tuple({token, socket}) when is_binary(token) and is_port(socket) do
%__MODULE__{token: %ReservationToken{value: token},
socket: socket}
end

end
15 changes: 8 additions & 7 deletions lib/fennec/udp/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ defmodule Fennec.UDP.Dispatcher do

# Dispatches data to worker associated with client's
# server-reflexive IP and port number
@spec dispatch(atom, atom, UDP.socket, Fennec.ip, Fennec.portn, binary) :: term
def dispatch(dispatcher, worker_sup, socket, ip, port, data) do
case find_or_start_worker(dispatcher, worker_sup, socket, ip, port) do
@spec dispatch(atom, atom, Fennec.client_info, binary) :: term
def dispatch(dispatcher, worker_sup, client, data) do
case find_or_start_worker(dispatcher, worker_sup, client) do
{:ok, pid} ->
Worker.process_data(pid, data)
_ ->
Expand All @@ -39,16 +39,17 @@ defmodule Fennec.UDP.Dispatcher do
Registry.lookup(dispatcher, key(ip, port))
end

defp find_or_start_worker(dispatcher, worker_sup, socket, ip, port) do
defp find_or_start_worker(dispatcher, worker_sup, client) do
%{ip: ip, port: port} = client
case lookup_worker(dispatcher, ip, port) do
[{_owner, pid}] -> {:ok, pid}
[] ->
start_worker(worker_sup, socket, ip, port)
start_worker(worker_sup, client)
end
end

defp start_worker(worker_sup, socket, ip, port) do
case Worker.start(worker_sup, socket, ip, port) do
defp start_worker(worker_sup, client) do
case Worker.start(worker_sup, client) do
{:ok, pid} ->
{:ok, pid}
_ ->
Expand Down
5 changes: 3 additions & 2 deletions lib/fennec/udp/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ defmodule Fennec.UDP.Receiver do
end

def handle_info({:udp, socket, ip, port, data}, %{socket: socket} = state) do
_ = Fennec.UDP.Dispatcher.dispatch(state.dispatcher, state.worker_sup,
state.socket, ip, port, data)
## TODO: refactor to a proper struct?
client = %{socket: socket, ip: ip, port: port}
_ = Fennec.UDP.Dispatcher.dispatch(state.dispatcher, state.worker_sup, client, data)
{:noreply, state}
end
end
Loading