Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscription #47

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[submodule "protos"]
path = protos
url = https://github.com/jellyfish-dev/protos.git
branch = master
branch = actually-subscribe-for-notifications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to revert this 🙏


8 changes: 6 additions & 2 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ defmodule Jellyfish.Room do
Logger.error("RTC Engine endpoint #{inspect(endpoint_id)} crashed")

if Map.has_key?(state.peers, endpoint_id) do
Phoenix.PubSub.broadcast(Jellyfish.PubSub, "server", {:peer_crashed, state.id, endpoint_id})
Phoenix.PubSub.broadcast(
Jellyfish.PubSub,
"server_notification",
{:peer_crashed, state.id, endpoint_id}
)

peer = Map.fetch!(state.peers, endpoint_id)

Expand All @@ -272,7 +276,7 @@ defmodule Jellyfish.Room do
else
Phoenix.PubSub.broadcast(
Jellyfish.PubSub,
"server",
"server_notification",
{:component_crashed, state.id, endpoint_id}
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/jellyfish/room_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ defmodule Jellyfish.RoomService do
Logger.warn("Process #{room_id} is down with reason: #{reason}")

Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_crashed)
Phoenix.PubSub.broadcast(Jellyfish.PubSub, "server", {:room_crashed, room_id})
Phoenix.PubSub.broadcast(Jellyfish.PubSub, "server_notification", {:room_crashed, room_id})

{:noreply, state}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/jellyfish_web/peer_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule JellyfishWeb.PeerSocket do

Phoenix.PubSub.broadcast(
Jellyfish.PubSub,
"server",
"server_notification",
{:peer_connected, room_id, peer_id}
)

Expand Down Expand Up @@ -140,7 +140,7 @@ defmodule JellyfishWeb.PeerSocket do
if Map.has_key?(state, :peer_id) do
Phoenix.PubSub.broadcast(
Jellyfish.PubSub,
"server",
"server_notification",
{:peer_disconnected, state.room_id, state.peer_id}
)
end
Expand Down
67 changes: 47 additions & 20 deletions lib/jellyfish_web/server_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ defmodule JellyfishWeb.ServerSocket do
PeerCrashed,
PeerDisconnected,
RoomCrashed,
RoomNotFound,
RoomsState,
RoomState,
RoomStateRequest
SubscribeRequest,
SubscriptionResponse
}

alias Jellyfish.ServerMessage.RoomState.{Component, Config, Peer}
alias Jellyfish.ServerMessage.SubscriptionResponse.{RoomNotFound, RoomsState, RoomState}

@heartbeat_interval 30_000

Expand All @@ -45,7 +43,6 @@ defmodule JellyfishWeb.ServerSocket do
case ServerMessage.decode(encoded_message) do
%ServerMessage{content: {:auth_request, %AuthRequest{token: token}}} ->
if token == Application.fetch_env!(:jellyfish, :server_api_token) do
:ok = Phoenix.PubSub.subscribe(Jellyfish.PubSub, "server")
Process.send_after(self(), :send_ping, @heartbeat_interval)

encoded_message =
Expand Down Expand Up @@ -82,12 +79,11 @@ defmodule JellyfishWeb.ServerSocket do
end

def handle_in({encoded_message, [opcode: :binary]}, state) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def handle_in({encoded_message, [opcode: :binary]}, state) do
@impl true
def handle_in({encoded_message, [opcode: :binary]}, state) do

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick; might as well annotate the handle_in callback from line 102.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, I prefer to make the annotations only once - above the first callback clause, but if everyone prefers adding annotations before each callback I will change this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be clearer this way, GPT4 seems to agree, but ultimately it's your decision

with %ServerMessage{content: request} <- ServerMessage.decode(encoded_message),
{:room_state_request, %RoomStateRequest{content: {_variant, option}}} <- request do
room_state = get_room_state(option)

with %ServerMessage{content: {:subscribe_request, request}} <-
ServerMessage.decode(encoded_message),
{:ok, response} <- handle_subscribe(request) do
reply =
%ServerMessage{content: room_state}
%ServerMessage{content: response}
|> ServerMessage.encode()

{:reply, :ok, {:binary, reply}, state}
Expand Down Expand Up @@ -115,6 +111,28 @@ defmodule JellyfishWeb.ServerSocket do
{:stop, :closed, {1003, "operation not allowed"}, state}
end

defp handle_subscribe(%SubscribeRequest{
id: id,
event_type:
{:server_notification,
%SubscribeRequest.ServerNotification{room_id: {_variant, option}}}
}) do
:ok = Phoenix.PubSub.subscribe(Jellyfish.PubSub, "server_notification")

room_state = get_room_state(option)

msg =
{:subscription_response,
%SubscriptionResponse{
id: id,
content: room_state
}}

{:ok, msg}
end

defp handle_subscribe(request), do: request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😠, pls remove


@impl true
def handle_info(:send_ping, state) do
Process.send_after(self(), :send_ping, @heartbeat_interval)
Expand Down Expand Up @@ -162,7 +180,7 @@ defmodule JellyfishWeb.ServerSocket do
:ok
end

defp get_room_state(:ALL) do
defp get_room_state(:OPTION_ALL) do
rooms =
RoomService.list_rooms()
|> Enum.map(&to_room_state_message/1)
Expand All @@ -185,24 +203,33 @@ defmodule JellyfishWeb.ServerSocket do
components =
room.components
|> Map.values()
|> Enum.map(&%Component{id: &1.id, type: to_proto_type(&1.type)})
|> Enum.map(
&%RoomState.Component{
id: &1.id,
type: to_proto_type(&1.type)
}
)

peers =
room.peers
|> Map.values()
|> Enum.map(
&%Peer{id: &1.id, type: to_proto_type(&1.type), status: to_proto_status(&1.status)}
&%RoomState.Peer{
id: &1.id,
type: to_proto_type(&1.type),
status: to_proto_status(&1.status)
}
)

config = struct!(Config, room.config)
config = struct!(RoomState.Config, room.config)

%RoomState{id: room.id, config: config, peers: peers, components: components}
end

defp to_proto_type(Jellyfish.Component.HLS), do: :HLS
defp to_proto_type(Jellyfish.Component.RTSP), do: :HLS
defp to_proto_type(Jellyfish.Peer.WebRTC), do: :WEBRTC
defp to_proto_type(Jellyfish.Component.HLS), do: :TYPE_HLS
defp to_proto_type(Jellyfish.Component.RTSP), do: :TYPE_RTSP
defp to_proto_type(Jellyfish.Peer.WebRTC), do: :TYPE_WEBRTC

defp to_proto_status(:disconnected), do: :DISCONNECTED
defp to_proto_status(:connected), do: :CONNECTED
defp to_proto_status(:disconnected), do: :STATUS_DISCONNECTED
defp to_proto_status(:connected), do: :STATUS_CONNECTED
end
134 changes: 95 additions & 39 deletions lib/protos/jellyfish/server_notifications.pb.ex
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
defmodule Jellyfish.ServerMessage.RoomStateRequest.Option do
defmodule Jellyfish.ServerMessage.SubscribeRequest.ServerNotification.Option do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :ALL, 0
field :OPTION_UNSPECIFIED, 0
field :OPTION_ALL, 1
end

defmodule Jellyfish.ServerMessage.RoomState.Peer.Type do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Type do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :WEBRTC, 0
field :TYPE_UNSPECIFIED, 0
field :TYPE_WEBRTC, 1
end

defmodule Jellyfish.ServerMessage.RoomState.Peer.Status do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Status do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :CONNECTED, 0
field :DISCONNECTED, 1
field :STATUS_UNSPECIFIED, 0
field :STATUS_CONNECTED, 1
field :STATUS_DISCONNECTED, 2
end

defmodule Jellyfish.ServerMessage.RoomState.Component.Type do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component.Type do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :HLS, 0
field :RTSP, 1
field :TYPE_UNSPECIFIED, 0
field :TYPE_HLS, 1
field :TYPE_RTSP, 2
end

defmodule Jellyfish.ServerMessage.RoomCrashed do
Expand Down Expand Up @@ -90,69 +94,128 @@ defmodule Jellyfish.ServerMessage.AuthRequest do
field :token, 1, type: :string
end

defmodule Jellyfish.ServerMessage.RoomStateRequest do
defmodule Jellyfish.ServerMessage.SubscribeRequest.ServerNotification do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

oneof :content, 0
oneof :room_id, 0

field :id, 1, type: :string, oneof: 0
field :option, 2, type: Jellyfish.ServerMessage.RoomStateRequest.Option, enum: true, oneof: 0

field :option, 2,
type: Jellyfish.ServerMessage.SubscribeRequest.ServerNotification.Option,
enum: true,
oneof: 0
end

defmodule Jellyfish.ServerMessage.SubscribeRequest do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

oneof :event_type, 0

field :id, 1, type: :string

field :server_notification, 2,
type: Jellyfish.ServerMessage.SubscribeRequest.ServerNotification,
json_name: "serverNotification",
oneof: 0
end

defmodule Jellyfish.ServerMessage.RoomState.Config do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Config do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :max_peers, 1, type: :uint32, json_name: "maxPeers"
end

defmodule Jellyfish.ServerMessage.RoomState.Peer do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :id, 1, type: :string
field :type, 2, type: Jellyfish.ServerMessage.RoomState.Peer.Type, enum: true
field :status, 3, type: Jellyfish.ServerMessage.RoomState.Peer.Status, enum: true

field :type, 2,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Type,
enum: true

field :status, 3,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Status,
enum: true
end

defmodule Jellyfish.ServerMessage.RoomState.Component do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :id, 1, type: :string
field :type, 2, type: Jellyfish.ServerMessage.RoomState.Component.Type, enum: true

field :type, 2,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component.Type,
enum: true
end

defmodule Jellyfish.ServerMessage.RoomState do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :id, 1, type: :string
field :config, 2, type: Jellyfish.ServerMessage.RoomState.Config
field :peers, 3, repeated: true, type: Jellyfish.ServerMessage.RoomState.Peer
field :components, 4, repeated: true, type: Jellyfish.ServerMessage.RoomState.Component
field :config, 2, type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Config

field :peers, 3,
repeated: true,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer

field :components, 4,
repeated: true,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component
end

defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomsState do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :rooms, 1, repeated: true, type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState
end

defmodule Jellyfish.ServerMessage.RoomsState do
defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomNotFound do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :rooms, 1, repeated: true, type: Jellyfish.ServerMessage.RoomState
field :id, 1, type: :string
end

defmodule Jellyfish.ServerMessage.RoomNotFound do
defmodule Jellyfish.ServerMessage.SubscriptionResponse do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

oneof :content, 0

field :id, 1, type: :string

field :room_state, 2,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState,
json_name: "roomState",
oneof: 0

field :rooms_state, 3,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomsState,
json_name: "roomsState",
oneof: 0

field :room_not_found, 4,
type: Jellyfish.ServerMessage.SubscriptionResponse.RoomNotFound,
json_name: "roomNotFound",
oneof: 0
end

defmodule Jellyfish.ServerMessage do
Expand Down Expand Up @@ -194,20 +257,13 @@ defmodule Jellyfish.ServerMessage do
json_name: "authRequest",
oneof: 0

field :room_state_request, 8,
type: Jellyfish.ServerMessage.RoomStateRequest,
json_name: "roomStateRequest",
field :subscribe_request, 8,
type: Jellyfish.ServerMessage.SubscribeRequest,
json_name: "subscribeRequest",
oneof: 0

field :room_state, 9, type: Jellyfish.ServerMessage.RoomState, json_name: "roomState", oneof: 0

field :rooms_state, 10,
type: Jellyfish.ServerMessage.RoomsState,
json_name: "roomsState",
oneof: 0

field :room_not_found, 11,
type: Jellyfish.ServerMessage.RoomNotFound,
json_name: "roomNotFound",
field :subscription_response, 9,
type: Jellyfish.ServerMessage.SubscriptionResponse,
json_name: "subscriptionResponse",
oneof: 0
end
Loading