Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 33 additions & 35 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule ExWebRTC.PeerConnection do
MediaStreamTrack,
RTPTransceiver,
RTPSender,
RTPReceiver,
SDPUtils,
SessionDescription,
Utils
Expand Down Expand Up @@ -907,29 +906,8 @@ defmodule ExWebRTC.PeerConnection do
end)

rtp_stats =
Enum.flat_map(state.transceivers, fn tr ->
tr_stats = %{kind: tr.kind, mid: tr.mid}

case tr.current_direction do
:sendonly ->
stats = RTPSender.get_stats(tr.sender, timestamp)
[Map.merge(stats, tr_stats)]

:recvonly ->
stats = RTPReceiver.get_stats(tr.receiver, timestamp)
Enum.map(stats, &Map.merge(&1, tr_stats))

:sendrecv ->
sender_stats = RTPSender.get_stats(tr.sender, timestamp)
receiver_stats = RTPReceiver.get_stats(tr.receiver, timestamp)

[Map.merge(sender_stats, tr_stats)] ++
Enum.map(receiver_stats, &Map.merge(&1, tr_stats))

_other ->
[]
end
end)
state.transceivers
|> Enum.flat_map(&RTPTransceiver.get_stats(&1, timestamp))
|> Map.new(fn stats -> {stats.id, stats} end)

stats = %{
Expand Down Expand Up @@ -1026,23 +1004,28 @@ defmodule ExWebRTC.PeerConnection do
@impl true
def handle_cast({:send_pli, track_id, rid}, state) do
state.transceivers
|> Enum.find(fn tr -> tr.receiver.track.id == track_id end)
|> Enum.with_index()
|> Enum.find(fn {tr, _idx} -> tr.receiver.track.id == track_id end)
|> case do
%{receiver: %{layers: %{^rid => %{ssrc: ssrc}}}} when ssrc != nil ->
encoded =
%ExRTCP.Packet.PayloadFeedback.PLI{sender_ssrc: 1, media_ssrc: ssrc}
|> ExRTCP.Packet.encode()
{tr, idx} ->
case RTPTransceiver.get_pli(tr, rid) do
{pli, tr} ->
encoded = ExRTCP.Packet.encode(pli)
:ok = DTLSTransport.send_rtcp(state.dtls_transport, encoded)
{:noreply, %{state | transceivers: List.replace_at(state.transceivers, idx, tr)}}

:ok = DTLSTransport.send_rtcp(state.dtls_transport, encoded)
:error ->
Logger.warning(
"Unable to send PLI for track #{inspect(track_id)}, rid #{inspect(rid)}"
)

{:noreply, state}
end

nil ->
Logger.warning("Attempted to send PLI for non existent track #{inspect(track_id)}")

_other ->
Logger.warning("Unable to send PLI for track #{inspect(track_id)}, rid #{inspect(rid)}")
{:noreply, state}
end

{:noreply, state}
end

@impl true
Expand Down Expand Up @@ -1852,6 +1835,21 @@ defmodule ExWebRTC.PeerConnection do
end
end

defp handle_rtcp_packet(state, %ExRTCP.Packet.PayloadFeedback.PLI{} = pli) do
state.transceivers
|> Enum.with_index()
|> Enum.find(fn {tr, _idx} -> tr.sender.ssrc == pli.media_ssrc end)
|> case do
nil ->
state

{tr, idx} ->
tr = RTPTransceiver.receive_pli(tr, pli)
transceivers = List.replace_at(state.transceivers, idx, tr)
%{state | transceivers: transceivers}
end
end

defp handle_rtcp_packet(state, _packet), do: state

defp do_get_description(nil, _candidates), do: nil
Expand Down
36 changes: 32 additions & 4 deletions lib/ex_webrtc/rtp_receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule ExWebRTC.RTPReceiver do

require Logger

alias ExRTCP.Packet.TransportFeedback.NACK
alias ExRTCP.Packet.{TransportFeedback.NACK, PayloadFeedback.PLI}
alias ExSDP.Attribute.Extmap
alias ExWebRTC.{MediaStreamTrack, Utils, RTPCodecParameters}
alias __MODULE__.{NACKGenerator, ReportRecorder, SimulcastDemuxer}
Expand All @@ -29,6 +29,8 @@ defmodule ExWebRTC.RTPReceiver do
bytes_received: non_neg_integer(),
packets_received: non_neg_integer(),
markers_received: non_neg_integer(),
nack_count: non_neg_integer(),
pli_count: non_neg_integer(),
report_recorder: ReportRecorder.t(),
nack_generator: NACKGenerator.t()
}
Expand Down Expand Up @@ -221,29 +223,53 @@ defmodule ExWebRTC.RTPReceiver do
Enum.map_reduce(receiver.layers, [], fn {rid, layer}, nacks ->
{nack, nack_generator} = NACKGenerator.get_feedback(layer.nack_generator)
nacks = if(nack != nil, do: [nack | nacks], else: nacks)
layer = %{layer | nack_generator: nack_generator}

layer = %{
layer
| nack_generator: nack_generator,
nack_count: layer.nack_count + if(nack != nil, do: 1, else: 0)
}

{{rid, layer}, nacks}
end)

receiver = %{receiver | layers: Map.new(layers)}
{nacks, receiver}
end

@doc false
@spec get_pli(receiver(), String.t() | nil) :: {PLI.t(), receiver()} | :error
def get_pli(receiver, rid) do
case receiver do
%{layers: %{^rid => %{ssrc: ssrc} = layer}} when ssrc != nil ->
layer = %{layer | pli_count: layer.pli_count + 1}
pli = %PLI{sender_ssrc: 1, media_ssrc: ssrc}
{pli, %{receiver | layers: Map.put(receiver.layers, rid, layer)}}

_other ->
:error
end
end

@doc false
@spec get_stats(receiver(), non_neg_integer()) :: [map()]
def get_stats(receiver, timestamp) do
Enum.map(receiver.layers, fn {rid, layer} ->
id = if(rid == nil, do: receiver.track.id, else: "#{receiver.track.id}:#{rid}")
codec = receiver.codec && String.split(receiver.codec.mime_type, "/") |> List.last()

%{
id: id,
rid: rid,
codec: codec,
type: :inbound_rtp,
timestamp: timestamp,
ssrc: layer.ssrc,
bytes_received: layer.bytes_received,
packets_received: layer.packets_received,
markers_received: layer.markers_received
markers_received: layer.markers_received,
nack_count: layer.nack_count,
pli_count: layer.pli_count
}
end)
end
Expand All @@ -259,7 +285,9 @@ defmodule ExWebRTC.RTPReceiver do
packets_received: 0,
markers_received: 0,
report_recorder: report_recorder,
nack_generator: %NACKGenerator{}
nack_generator: %NACKGenerator{},
nack_count: 0,
pli_count: 0
}
end
end
37 changes: 33 additions & 4 deletions lib/ex_webrtc/rtp_sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule ExWebRTC.RTPSender do
Implementation of the [RTCRtpSender](https://www.w3.org/TR/webrtc/#rtcrtpsender-interface).
"""

alias ExRTCP.Packet.{TransportFeedback.NACK, PayloadFeedback.PLI}
alias ExWebRTC.{MediaStreamTrack, RTPCodecParameters, Utils}
alias ExSDP.Attribute.Extmap
alias __MODULE__.{NACKResponder, ReportRecorder}
Expand All @@ -24,7 +25,11 @@ defmodule ExWebRTC.RTPSender do
rtx_ssrc: non_neg_integer() | nil,
packets_sent: non_neg_integer(),
bytes_sent: non_neg_integer(),
retransmitted_packets_sent: non_neg_integer(),
retransmitted_bytes_sent: non_neg_integer(),
Comment on lines +28 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: we can make this a bit shortert

Suggested change
retransmitted_packets_sent: non_neg_integer(),
retransmitted_bytes_sent: non_neg_integer(),
rtx_packets_sent: non_neg_integer(),
rtx_bytes_sent: non_neg_integer(),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a stat from W3C, so I'd probably leave it as it is for the sake of compatibility. If it was for me to decide, I'd also use rtx_ prefix.

markers_sent: non_neg_integer(),
nack_count: non_neg_integer(),
pli_count: non_neg_integer(),
reports?: boolean(),
outbound_rtx?: boolean(),
report_recorder: ReportRecorder.t(),
Expand Down Expand Up @@ -86,7 +91,11 @@ defmodule ExWebRTC.RTPSender do
mid: mid,
packets_sent: 0,
bytes_sent: 0,
retransmitted_packets_sent: 0,
retransmitted_bytes_sent: 0,
markers_sent: 0,
nack_count: 0,
pli_count: 0,
reports?: :rtcp_reports in features,
outbound_rtx?: :outbound_rtx in features,
report_recorder: %ReportRecorder{clock_rate: codec && codec.clock_rate},
Expand Down Expand Up @@ -159,6 +168,17 @@ defmodule ExWebRTC.RTPSender do

data = ExRTP.Packet.encode(packet)

sender =
if rtx? do
%{
sender
| retransmitted_packets_sent: sender.retransmitted_packets_sent + 1,
retransmitted_bytes_sent: sender.retransmitted_bytes_sent + byte_size(data)
}
else
sender
end

sender = %{
sender
| packets_sent: sender.packets_sent + 1,
Expand All @@ -172,15 +192,20 @@ defmodule ExWebRTC.RTPSender do
end

@doc false
@spec receive_nack(sender(), ExRTCP.Packet.TransportFeedback.NACK.t()) ::
{[ExRTP.Packet.t()], sender()}
@spec receive_nack(sender(), NACK.t()) :: {[ExRTP.Packet.t()], sender()}
def receive_nack(sender, nack) do
{packets, nack_responder} = NACKResponder.get_rtx(sender.nack_responder, nack)
sender = %{sender | nack_responder: nack_responder}
sender = %{sender | nack_responder: nack_responder, nack_count: sender.nack_count + 1}

{packets, sender}
end

@doc false
@spec receive_pli(sender(), PLI.t()) :: sender()
def receive_pli(sender, _pli) do
%{sender | pli_count: sender.pli_count + 1}
end

@doc false
@spec get_reports(sender()) :: {[ExRTCP.Packet.SenderReport.t()], sender()}
def get_reports(sender) do
Expand All @@ -204,7 +229,11 @@ defmodule ExWebRTC.RTPSender do
ssrc: sender.ssrc,
packets_sent: sender.packets_sent,
bytes_sent: sender.bytes_sent,
markers_sent: sender.markers_sent
markers_sent: sender.markers_sent,
retransmitted_packets_sent: sender.retransmitted_packets_sent,
retransmitted_bytes_sent: sender.retransmitted_bytes_sent,
nack_count: sender.nack_count,
pli_count: sender.pli_count
}
end
end
44 changes: 43 additions & 1 deletion lib/ex_webrtc/rtp_transceiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule ExWebRTC.RTPTransceiver do
Utils
}

alias ExRTCP.Packet.{ReceiverReport, SenderReport, TransportFeedback.NACK}
alias ExRTCP.Packet.{ReceiverReport, SenderReport, TransportFeedback.NACK, PayloadFeedback.PLI}

@report_interval 1000
@nack_interval 100
Expand Down Expand Up @@ -355,6 +355,13 @@ defmodule ExWebRTC.RTPTransceiver do
{packets, transceiver}
end

@doc false
@spec receive_pli(transceiver(), PLI.t()) :: transceiver()
def receive_pli(transceiver, pli) do
sender = RTPSender.receive_pli(transceiver.sender, pli)
%{transceiver | sender: sender}
end

@doc false
@spec send_packet(transceiver(), ExRTP.Packet.t(), boolean()) :: {binary(), transceiver()}
def send_packet(transceiver, packet, rtx?) do
Expand Down Expand Up @@ -395,6 +402,15 @@ defmodule ExWebRTC.RTPTransceiver do
{nacks, transceiver}
end

@doc false
@spec get_pli(transceiver(), String.t() | nil) :: {PLI.t(), transceiver()} | :error
def get_pli(transceiver, rid) do
case RTPReceiver.get_pli(transceiver.receiver, rid) do
:error -> :error
{pli, receiver} -> {pli, %{transceiver | receiver: receiver}}
end
end

@doc false
@spec to_answer_mline(transceiver(), ExSDP.Media.t(), Keyword.t()) :: ExSDP.Media.t()
def to_answer_mline(transceiver, mline, opts) do
Expand Down Expand Up @@ -467,6 +483,32 @@ defmodule ExWebRTC.RTPTransceiver do
%{transceiver | direction: :inactive, stopping: true}
end

@doc false
@spec get_stats(transceiver(), non_neg_integer()) :: [map()]
def get_stats(transceiver, timestamp) do
tr_stats = %{kind: transceiver.kind, mid: transceiver.mid}

case transceiver.current_direction do
:sendonly ->
stats = RTPSender.get_stats(transceiver.sender, timestamp)
[Map.merge(stats, tr_stats)]

:recvonly ->
stats = RTPReceiver.get_stats(transceiver.receiver, timestamp)
Enum.map(stats, &Map.merge(&1, tr_stats))

:sendrecv ->
sender_stats = RTPSender.get_stats(transceiver.sender, timestamp)
receiver_stats = RTPReceiver.get_stats(transceiver.receiver, timestamp)

[Map.merge(sender_stats, tr_stats)] ++
Enum.map(receiver_stats, &Map.merge(&1, tr_stats))

_other ->
[]
end
end

defp to_mline(transceiver, opts) do
pt = Enum.map(transceiver.codecs, fn codec -> codec.payload_type end)

Expand Down
10 changes: 8 additions & 2 deletions test/ex_webrtc/rtp_receiver_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ defmodule ExWebRTC.RTPReceiverTest do
ssrc: 1234,
bytes_received: byte_size(raw_packet1),
packets_received: 1,
markers_received: 0
markers_received: 0,
codec: "opus",
nack_count: 0,
pli_count: 0
}
] == RTPReceiver.get_stats(receiver, timestamp)

Expand All @@ -45,7 +48,10 @@ defmodule ExWebRTC.RTPReceiverTest do
ssrc: 1234,
bytes_received: byte_size(raw_packet1) + byte_size(raw_packet2),
packets_received: 2,
markers_received: 1
markers_received: 1,
codec: "opus",
nack_count: 0,
pli_count: 0
}
] == RTPReceiver.get_stats(receiver, timestamp)
end
Expand Down
Loading