diff --git a/lib/jellyfish/component.ex b/lib/jellyfish/component.ex index fe43184b..c494eda5 100644 --- a/lib/jellyfish/component.ex +++ b/lib/jellyfish/component.ex @@ -10,26 +10,32 @@ defmodule Jellyfish.Component do alias Jellyfish.Component.{HLS, RTSP} + @callback metadata() :: map() + @enforce_keys [ :id, :type, - :engine_endpoint + :engine_endpoint, + :metadata ] defstruct @enforce_keys @type id :: String.t() @type component :: HLS | RTSP + @type metadata :: HLS.metadata() | RTSP.metadata() @typedoc """ This module contains: * `id` - component id * `type` - type of this component * `engine_endpoint` - engine endpoint for this component + * `metadata` - metadata of this component """ @type t :: %__MODULE__{ id: id(), type: component(), - engine_endpoint: Membrane.ChildrenSpec.child_definition() + engine_endpoint: Membrane.ChildrenSpec.child_definition(), + metadata: metadata() } @spec parse_type(String.t()) :: {:ok, component()} | {:error, :invalid_type} @@ -44,11 +50,14 @@ defmodule Jellyfish.Component do @spec new(component(), map()) :: {:ok, t()} | {:error, term()} def new(type, options) do with {:ok, endpoint} <- type.config(options) do + metadata = type.metadata() + {:ok, %__MODULE__{ id: UUID.uuid4(), type: type, - engine_endpoint: endpoint + engine_endpoint: endpoint, + metadata: metadata }} else {:error, _reason} = error -> error diff --git a/lib/jellyfish/component/hls.ex b/lib/jellyfish/component/hls.ex index 342d6215..c74aad81 100644 --- a/lib/jellyfish/component/hls.ex +++ b/lib/jellyfish/component/hls.ex @@ -4,6 +4,7 @@ defmodule Jellyfish.Component.HLS do """ @behaviour Jellyfish.Endpoint.Config + @behaviour Jellyfish.Component alias Membrane.RTC.Engine.Endpoint.HLS alias Membrane.RTC.Engine.Endpoint.HLS.{CompositorConfig, HLSConfig, MixerConfig} @@ -12,6 +13,8 @@ defmodule Jellyfish.Component.HLS do @segment_duration Time.seconds(4) @partial_segment_duration Time.milliseconds(400) + @type metadata :: %{playable: boolean()} + @impl true def config(options) do base_path = Application.fetch_env!(:jellyfish, :output_base_path) @@ -42,4 +45,7 @@ defmodule Jellyfish.Component.HLS do } }} end + + @impl true + def metadata(), do: %{playable: false} end diff --git a/lib/jellyfish/component/rtsp.ex b/lib/jellyfish/component/rtsp.ex index f2988b62..fecf842b 100644 --- a/lib/jellyfish/component/rtsp.ex +++ b/lib/jellyfish/component/rtsp.ex @@ -4,11 +4,14 @@ defmodule Jellyfish.Component.RTSP do """ @behaviour Jellyfish.Endpoint.Config + @behaviour Jellyfish.Component alias Membrane.RTC.Engine.Endpoint.RTSP alias JellyfishWeb.ApiSpec + @type metadata :: %{} + @impl true def config(%{engine_pid: engine} = options) do options = Map.drop(options, [:engine_pid, :room_id]) @@ -29,4 +32,7 @@ defmodule Jellyfish.Component.RTSP do {:error, _reason} = error -> error end end + + @impl true + def metadata(), do: %{} end diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 7b8a1738..e61dd381 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -227,6 +227,7 @@ defmodule Jellyfish.Room do {:reply, {:ok, component}, state} else {:error, :incompatible_codec} -> + Logger.warn("Unable to add component: incompatible codec, HLS needs 'h264' video codec.") {:reply, {:error, :incompatible_codec}, state} {:error, reason} -> @@ -321,6 +322,26 @@ defmodule Jellyfish.Room do {:noreply, state} end + @impl true + def handle_info({:playlist_playable, :audio, _playlist_idl}, state), do: {:noreply, state} + + @impl true + def handle_info({:playlist_playable, :video, _playlist_idl}, state) do + endpoint_id = + Enum.find_value(state.components, fn {id, %{type: type}} -> + if type == Component.HLS, do: id + end) + + Phoenix.PubSub.broadcast( + Jellyfish.PubSub, + "server_notification", + {:hls_playable, state.id, endpoint_id} + ) + + state = update_in(state, [:components, endpoint_id], &Map.put(&1, :playable, true)) + {:noreply, state} + end + @impl true def handle_info(info, state) do Logger.warn("Received unexpected info: #{inspect(info)}") diff --git a/lib/jellyfish_web/api_spec/component.ex b/lib/jellyfish_web/api_spec/component.ex index 76037a41..b00e859e 100644 --- a/lib/jellyfish_web/api_spec/component.ex +++ b/lib/jellyfish_web/api_spec/component.ex @@ -35,14 +35,30 @@ defmodule JellyfishWeb.ApiSpec.Component do }) end + defmodule Metadata do + @moduledoc false + + require OpenApiSpex + + OpenApiSpex.schema(%{ + title: "ComponentMetadata", + description: "Component-specific metadata", + type: :object, + properties: %{ + playable: %Schema{type: :boolean} + } + }) + end + OpenApiSpex.schema(%{ title: "Component", description: "Describes component", type: :object, properties: %{ id: %Schema{type: :string, description: "Assigned component id", example: "component-1"}, - type: Type + type: Type, + metadata: Metadata }, - required: [:id, :type] + required: [:id, :type, :metadata] }) end diff --git a/lib/jellyfish_web/controllers/component_json.ex b/lib/jellyfish_web/controllers/component_json.ex index da3ea629..1bfb4a38 100644 --- a/lib/jellyfish_web/controllers/component_json.ex +++ b/lib/jellyfish_web/controllers/component_json.ex @@ -15,7 +15,8 @@ defmodule JellyfishWeb.ComponentJSON do %{ id: component.id, - type: type + type: type, + metadata: component.metadata } end end diff --git a/lib/jellyfish_web/server_socket.ex b/lib/jellyfish_web/server_socket.ex index 7aac5a5a..1dab7b67 100644 --- a/lib/jellyfish_web/server_socket.ex +++ b/lib/jellyfish_web/server_socket.ex @@ -11,6 +11,7 @@ defmodule JellyfishWeb.ServerSocket do Authenticated, AuthRequest, ComponentCrashed, + HlsPlayable, MetricsReport, PeerConnected, PeerCrashed, @@ -148,35 +149,8 @@ defmodule JellyfishWeb.ServerSocket do @impl true def handle_info(msg, state) do - content = - case msg do - {:room_created, room_id} -> - {:room_created, %RoomCreated{room_id: room_id}} - - {:room_deleted, room_id} -> - {:room_deleted, %RoomDeleted{room_id: room_id}} - - {:room_crashed, room_id} -> - {:room_crashed, %RoomCrashed{room_id: room_id}} - - {:peer_connected, room_id, peer_id} -> - {:peer_connected, %PeerConnected{room_id: room_id, peer_id: peer_id}} - - {:peer_disconnected, room_id, peer_id} -> - {:peer_disconnected, %PeerDisconnected{room_id: room_id, peer_id: peer_id}} - - {:peer_crashed, room_id, peer_id} -> - {:peer_crashed, %PeerCrashed{room_id: room_id, peer_id: peer_id}} - - {:component_crashed, room_id, component_id} -> - {:component_crashed, %ComponentCrashed{room_id: room_id, component_id: component_id}} - - {:metrics, report} -> - {:metrics_report, %MetricsReport{metrics: report}} - end - + content = to_proto_notification(msg) encoded_msg = %ServerMessage{content: content} |> ServerMessage.encode() - {:push, {:binary, encoded_msg}, state} end @@ -212,7 +186,7 @@ defmodule JellyfishWeb.ServerSocket do |> Enum.map( &%RoomState.Component{ id: &1.id, - type: to_proto_type(&1.type) + component: to_proto_component(&1) } ) @@ -235,8 +209,33 @@ defmodule JellyfishWeb.ServerSocket do %RoomState{id: room.id, config: config, peers: peers, components: components} end - defp to_proto_type(Jellyfish.Component.HLS), do: :TYPE_HLS - defp to_proto_type(Jellyfish.Component.RTSP), do: :TYPE_RTSP + defp to_proto_notification({:room_created, room_id}), + do: {:room_created, %RoomCreated{room_id: room_id}} + + defp to_proto_notification({:room_deleted, room_id}), + do: {:room_deleted, %RoomDeleted{room_id: room_id}} + + defp to_proto_notification({:room_crashed, room_id}), + do: {:room_crashed, %RoomCrashed{room_id: room_id}} + + defp to_proto_notification({:peer_connected, room_id, peer_id}), + do: {:peer_connected, %PeerConnected{room_id: room_id, peer_id: peer_id}} + + defp to_proto_notification({:peer_disconnected, room_id, peer_id}), + do: {:peer_disconnected, %PeerDisconnected{room_id: room_id, peer_id: peer_id}} + + defp to_proto_notification({:peer_crashed, room_id, peer_id}), + do: {:peer_crashed, %PeerCrashed{room_id: room_id, peer_id: peer_id}} + + defp to_proto_notification({:component_crashed, room_id, component_id}), + do: {:component_crashed, %ComponentCrashed{room_id: room_id, component_id: component_id}} + + defp to_proto_notification({:metrics, report}), + do: {:metrics_report, %MetricsReport{metrics: report}} + + defp to_proto_notification({:hls_playable, room_id, component_id}), + do: {:hls_playable, %HlsPlayable{room_id: room_id, component_id: component_id}} + defp to_proto_type(Jellyfish.Peer.WebRTC), do: :TYPE_WEBRTC defp to_proto_codec(:h264), do: :CODEC_H264 @@ -245,4 +244,10 @@ defmodule JellyfishWeb.ServerSocket do defp to_proto_status(:disconnected), do: :STATUS_DISCONNECTED defp to_proto_status(:connected), do: :STATUS_CONNECTED + + defp to_proto_component(%{type: Jellyfish.Component.HLS, metadata: %{playable: playable}}), + do: {:hls, %RoomState.Component.Hls{playable: playable}} + + defp to_proto_component(%{type: Jellyfish.Component.RTSP}), + do: {:rtsp, %RoomState.Component.Rtsp{}} end diff --git a/lib/protos/jellyfish/server_notifications.pb.ex b/lib/protos/jellyfish/server_notifications.pb.ex index 6486fcac..9d2eeffa 100644 --- a/lib/protos/jellyfish/server_notifications.pb.ex +++ b/lib/protos/jellyfish/server_notifications.pb.ex @@ -36,16 +36,6 @@ defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer.Status do field :STATUS_DISCONNECTED, 2 end -defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Type do - @moduledoc false - - use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 - - field :TYPE_UNSPECIFIED, 0 - field :TYPE_HLS, 1 - field :TYPE_RTSP, 2 -end - defmodule Jellyfish.ServerMessage.RoomCrashed do @moduledoc false @@ -168,16 +158,33 @@ defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer do enum: true end +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Hls do + @moduledoc false + + use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 + + field :playable, 1, type: :bool +end + +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Rtsp do + @moduledoc false + + use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 +end + defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 + oneof :component, 0 + field :id, 1, type: :string + field :hls, 2, type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Hls, oneof: 0 - field :type, 2, - type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Type, - enum: true + field :rtsp, 3, + type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Rtsp, + oneof: 0 end defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState do @@ -259,6 +266,15 @@ defmodule Jellyfish.ServerMessage.MetricsReport do field :metrics, 1, type: :string end +defmodule Jellyfish.ServerMessage.HlsPlayable do + @moduledoc false + + use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 + + field :room_id, 1, type: :string, json_name: "roomId" + field :component_id, 2, type: :string, json_name: "componentId" +end + defmodule Jellyfish.ServerMessage do @moduledoc false @@ -322,4 +338,9 @@ defmodule Jellyfish.ServerMessage do type: Jellyfish.ServerMessage.MetricsReport, json_name: "metricsReport", oneof: 0 + + field :hls_playable, 13, + type: Jellyfish.ServerMessage.HlsPlayable, + json_name: "hlsPlayable", + oneof: 0 end diff --git a/openapi.yaml b/openapi.yaml index 452265e0..d814de10 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -15,11 +15,14 @@ components: description: Assigned component id example: component-1 type: string + metadata: + $ref: '#/components/schemas/ComponentMetadata' type: $ref: '#/components/schemas/ComponentType' required: - id - type + - metadata title: Component type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Component @@ -33,6 +36,14 @@ components: title: ComponentDetailsResponse type: object x-struct: Elixir.JellyfishWeb.ApiSpec.ComponentDetailsResponse + ComponentMetadata: + description: Component-specific metadata + properties: + playable: + type: boolean + title: ComponentMetadata + type: object + x-struct: Elixir.JellyfishWeb.ApiSpec.Component.Metadata ComponentOptions: description: Component-specific options oneOf: diff --git a/protos b/protos index f6d060f8..28310041 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit f6d060f8f443a6ee29ee5e7fb6fcd4bfc1d1d466 +Subproject commit 283100419e2693f131c26a737d617758367301b6 diff --git a/test/jellyfish_web/controllers/component_controller_test.exs b/test/jellyfish_web/controllers/component_controller_test.exs index 7c30c79c..2ce8f44b 100644 --- a/test/jellyfish_web/controllers/component_controller_test.exs +++ b/test/jellyfish_web/controllers/component_controller_test.exs @@ -42,7 +42,10 @@ defmodule JellyfishWeb.ComponentControllerTest do conn = post(conn, ~p"/room/#{room_id}/component", type: "hls") - assert response = %{"data" => %{"id" => id}} = json_response(conn, :created) + assert response = + %{"data" => %{"id" => id, "type" => "hls", "metadata" => %{"playable" => false}}} = + json_response(conn, :created) + assert_response_schema(response, "ComponentDetailsResponse", @schema) conn = get(conn, ~p"/room/#{room_id}") @@ -80,7 +83,10 @@ defmodule JellyfishWeb.ComponentControllerTest do options: %{sourceUri: @source_uri} ) - assert response = %{"data" => %{"id" => id}} = json_response(conn, :created) + assert response = + %{"data" => %{"id" => id, "type" => "rtsp", "metadata" => %{}}} = + json_response(conn, :created) + assert_response_schema(response, "ComponentDetailsResponse", @schema) conn = get(conn, ~p"/room/#{room_id}") diff --git a/test/jellyfish_web/integration/server_socket_test.exs b/test/jellyfish_web/integration/server_socket_test.exs index afe61248..326e9ffb 100644 --- a/test/jellyfish_web/integration/server_socket_test.exs +++ b/test/jellyfish_web/integration/server_socket_test.exs @@ -4,12 +4,13 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do alias __MODULE__.Endpoint alias Jellyfish.PeerMessage - + alias Jellyfish.RoomService alias Jellyfish.ServerMessage alias Jellyfish.ServerMessage.{ Authenticated, AuthRequest, + HlsPlayable, MetricsReport, PeerConnected, PeerDisconnected, @@ -23,6 +24,9 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do alias Jellyfish.ServerMessage.SubscribeRequest.{Metrics, ServerNotification} alias Jellyfish.ServerMessage.SubscribeResponse.{RoomNotFound, RoomsState, RoomState} + alias Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component + alias Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.{Hls, Rtsp} + alias JellyfishWeb.{PeerSocket, ServerSocket, WS} @port 5907 @@ -32,6 +36,8 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do @max_peers 1 @video_codec :CODEC_H264 + @source_uri "rtsp://placeholder-19inrifjbsjb.it:12345/afwefae" + Application.put_env( :jellyfish, Endpoint, @@ -146,6 +152,9 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do {room_id, peer_id, _token, conn} = add_room_and_peer(conn, server_api_token) + {conn, hls_id} = add_hls_component(conn, room_id) + {conn, _rtsp_id} = add_rtsp_component(conn, room_id) + response = subscribe( ws, @@ -162,7 +171,7 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do %RoomState{ id: ^room_id, config: %{max_peers: @max_peers, video_codec: @video_codec}, - components: [], + components: components, peers: [ %RoomState.Peer{ id: ^peer_id, @@ -175,6 +184,19 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do }} } = response + assert components + |> Enum.map(fn %Component{component: component} -> component end) + |> Enum.all?(fn + {:hls, %Hls{playable: false}} -> true + {:rtsp, %Rtsp{}} -> true + _other -> false + end) + + {:ok, room_pid} = RoomService.find_room(room_id) + + send(room_pid, {:playlist_playable, :video, "hls_output/#{room_id}"}) + assert_receive %HlsPlayable{room_id: room_id, component_id: ^hls_id} + conn = delete(conn, ~p"/room/#{room_id}/") assert response(conn, :no_content) end @@ -346,6 +368,25 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do {room_id, peer_id, peer_token, conn} end + defp add_hls_component(conn, room_id) do + conn = post(conn, ~p"/room/#{room_id}/component", type: "hls") + + assert %{"id" => id, "metadata" => %{"playable" => false}, "type" => "hls"} = + json_response(conn, :created)["data"] + + {conn, id} + end + + defp add_rtsp_component(conn, room_id) do + conn = + post(conn, ~p"/room/#{room_id}/component", type: "rtsp", options: %{sourceUri: @source_uri}) + + assert %{"id" => id, "metadata" => %{}, "type" => "rtsp"} = + json_response(conn, :created)["data"] + + {conn, id} + end + defp auth_request(token) do ServerMessage.encode(%ServerMessage{content: {:auth_request, %AuthRequest{token: token}}}) end