Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lib/ex_rtmp/client/media_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule ExRTMP.Client.MediaProcessor do
@moduledoc false

alias ExFLV.Tag.{AudioData, VideoData}
alias ExRTMP.Message

@type track :: {:codec, atom(), binary()}
@type video_sample ::
Expand All @@ -24,20 +25,20 @@ defmodule ExRTMP.Client.MediaProcessor do
@spec new() :: t()
def new(), do: %__MODULE__{}

@spec push_video(t(), non_neg_integer(), iodata()) :: {video_return(), t()}
def push_video(processor, timestamp, data) do
data
@spec push_video(Message.t(), t()) :: {video_return(), t()}
def push_video(message, processor) do
message.payload
|> IO.iodata_to_binary()
|> VideoData.parse!()
|> handle_video_tag(timestamp, processor)
|> handle_video_tag(message.timestamp, processor)
end

@spec push_audio(t(), non_neg_integer(), iodata()) :: {audio_return(), t()}
def push_audio(processor, timestamp, data) do
data
@spec push_audio(Message.t(), t()) :: {audio_return(), t()}
def push_audio(message, processor) do
message.payload
|> IO.iodata_to_binary()
|> AudioData.parse!()
|> handle_audio_tag(timestamp, processor)
|> handle_audio_tag(message.timestamp, processor)
end

defp handle_video_tag(%VideoData{codec_id: :avc} = tag, timestamp, processor) do
Expand Down
14 changes: 4 additions & 10 deletions lib/ex_rtmp/client/stream_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,14 @@ defmodule ExRTMP.Client.StreamContext do
@doc false
@spec handle_video_data(t(), ExRTMP.Message.t()) :: {MediaProcessor.video_return(), t()}
def handle_video_data(stream_ctx, message) do
{data, processor} =
MediaProcessor.push_video(stream_ctx.media_processor, message.timestamp, message.payload)

stream_ctx = %{stream_ctx | media_processor: processor}
{data, stream_ctx}
{data, processor} = MediaProcessor.push_video(message, stream_ctx.media_processor)
{data, %{stream_ctx | media_processor: processor}}
end

@doc false
@spec handle_audio_data(t(), ExRTMP.Message.t()) :: {MediaProcessor.audio_return(), t()}
def handle_audio_data(stream_ctx, message) do
{data, processor} =
MediaProcessor.push_audio(stream_ctx.media_processor, message.timestamp, message.payload)

stream_ctx = %{stream_ctx | media_processor: processor}
{data, stream_ctx}
{data, processor} = MediaProcessor.push_audio(message, stream_ctx.media_processor)
{data, %{stream_ctx | media_processor: processor}}
end
end
25 changes: 22 additions & 3 deletions lib/ex_rtmp/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ defmodule ExRTMP.Server do

The server listens for incoming RTMP client connections and spawns a new
`ExRTMP.Server.ClientSession` process for each connected client.

## Handling Media
When `demux` is set to `true`, the server will demux the incoming RTMP
streams into audio and video frames before passing them to the handler module.

The format of the data received by the handler is:
* `{:codec, codec_type, init_data}` - The codec type and initialization data.
* `{:sample, payload, dts, pts, keyframe?}` - A video sample, the payload depends on the codec type. In the case of `avc`,
the payload is a list of NAL units, for other codecs, it is the raw video frame data.
* `{:sample, payload, timestamp}` - An audio sample.

If `demux` is set to `false`, the handler module will receive the raw RTMP
data as is.
"""

use GenServer
Expand All @@ -15,7 +28,8 @@ defmodule ExRTMP.Server do
@type start_options :: [
{:port, :inet.port_number()},
{:handler, module()},
{:handler_options, any()}
{:handler_options, any()},
{:demux, boolean()}
]

@default_port 1935
Expand All @@ -42,6 +56,9 @@ defmodule ExRTMP.Server do

* `handler_options` - A keyword list of options that will be passed to the
handler module when it is started. This option is optional.

* `demux` - Whether the server will demux the incoming RTMP streams into
audio and video frames. Defaults to `true`. See [Handling Media](#module-handling-media) below.
"""
@spec start_link(start_options()) :: GenServer.on_start()
def start_link(opts) do
Expand Down Expand Up @@ -76,7 +93,8 @@ defmodule ExRTMP.Server do
socket: server_socket,
pid: self(),
handler: opts[:handler] || raise("Handler module is required"),
handler_options: opts[:handler_options]
handler_options: opts[:handler_options],
demux: Keyword.get(opts, :demux, true)
}

listener = spawn_link(fn -> accept_client_connection(state) end)
Expand Down Expand Up @@ -121,7 +139,8 @@ defmodule ExRTMP.Server do
ClientSession.start(
socket: client_socket,
handler: state.handler,
handler_options: state.handler_options
handler_options: state.handler_options,
demux: state.demux
)

:ok = :gen_tcp.controlling_process(client_socket, pid)
Expand Down
32 changes: 28 additions & 4 deletions lib/ex_rtmp/server/client_session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule ExRTMP.Server.ClientSession do
require Logger

alias ExRTMP.ChunkParser
alias ExRTMP.Client.MediaProcessor
alias ExRTMP.Message
alias ExRTMP.Message.Command.NetConnection
alias ExRTMP.Message.Command.NetConnection.{CreateStream, Response}
Expand All @@ -27,14 +28,16 @@ defmodule ExRTMP.Server.ClientSession do
handler_mod: module(),
handler_state: any(),
state: state(),
stream_id: non_neg_integer() | nil
stream_id: non_neg_integer() | nil,
media_processor: MediaProcessor.t() | nil
}

@enforce_keys [:socket]
defstruct @enforce_keys ++
[
:handler_mod,
:handler_state,
:media_processor,
chunk_parser: ChunkParser.new(),
state: :init,
stream_id: nil
Expand Down Expand Up @@ -78,7 +81,8 @@ defmodule ExRTMP.Server.ClientSession do
state = %State{
handler_mod: handler_mod,
handler_state: handler_mod.init(options[:handler_options]),
socket: options[:socket]
socket: options[:socket],
media_processor: if(options[:demux], do: MediaProcessor.new())
}

{:ok, state, {:continue, :handshake}}
Expand Down Expand Up @@ -176,7 +180,7 @@ defmodule ExRTMP.Server.ClientSession do
state
end

defp handle_message(%{type: 8} = message, %{state: :publishing} = state) do
defp handle_message(%{type: 8} = message, %{state: :publishing, media_processor: nil} = state) do
handler_state =
state.handler_mod.handle_audio_data(
message.timestamp,
Expand All @@ -187,7 +191,17 @@ defmodule ExRTMP.Server.ClientSession do
%{state | handler_state: handler_state}
end

defp handle_message(%{type: 9} = message, %{state: :publishing} = state) do
defp handle_message(%{type: 8} = message, %{state: :publishing} = state) do
{media, processor} = MediaProcessor.push_audio(message, state.media_processor)
mod = state.handler_mod

handler_state =
Enum.reduce(List.wrap(media), state.handler_state, &mod.handle_audio_data(0, &1, &2))

%{state | handler_state: handler_state, media_processor: processor}
end

defp handle_message(%{type: 9} = message, %{state: :publishing, media_processor: nil} = state) do
handler_state =
state.handler_mod.handle_video_data(
message.timestamp,
Expand All @@ -198,6 +212,16 @@ defmodule ExRTMP.Server.ClientSession do
%{state | handler_state: handler_state}
end

defp handle_message(%{type: 9} = message, %{state: :publishing} = state) do
{media, processor} = MediaProcessor.push_video(message, state.media_processor)
mod = state.handler_mod

handler_state =
Enum.reduce(List.wrap(media), state.handler_state, &mod.handle_video_data(0, &1, &2))

%{state | handler_state: handler_state, media_processor: processor}
end

defp handle_message(%{type: type}, state) when type == 8 or type == 9, do: state

defp handle_message(%{type: 18, payload: %Metadata{data: data}}, state) do
Expand Down