Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribing for events #19

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "protos"]
path = protos
url = https://github.com/jellyfish-dev/protos.git
branch = master
branch = actually-subscribe-for-notifications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a friendly reminder

6 changes: 3 additions & 3 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Jellyfish.Component do

alias Jellyfish.Component.{HLS, RTSP}
alias Jellyfish.Exception.StructureError
alias Jellyfish.ServerMessage.RoomState
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState

@enforce_keys [
:id,
Expand Down Expand Up @@ -83,6 +83,6 @@ defmodule Jellyfish.Component do
defp type_from_string("hls"), do: HLS
defp type_from_string("rtsp"), do: RTSP

defp type_from_proto(:HLS), do: HLS
defp type_from_proto(:RTSP), do: RTSP
defp type_from_proto(:TYPE_HLS), do: HLS
defp type_from_proto(:TYPE_RTSP), do: RTSP
end
142 changes: 87 additions & 55 deletions lib/jellyfish/notifier.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
defmodule Jellyfish.Notifier do
@moduledoc """
Module defining a process responsible for establishing
WebSocket connection and receiving notifications form Jellyfish server.
WebSocket connection and receiving notifications from Jellyfish server.

Define the connection configuration in the mix config
``` config.exs
config :jellyfish_server_sdk,
server_address: "localhost:5002",
server_api_token: "your-jellyfish-token",
secure?: true
```

```
iex> {:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_token: "your-jellyfish-token")
# Start the Notifier
iex> {:ok, notifier} = Jellyfish.Notifier.start()
{:ok, #PID<0.301.0>}
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :all)

# Optionally, you can provide connection options instead of using config:
{:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_token: "your-jellyfish-token")
```

```
# Subscribe current process to all server notifications.
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :server_notifications, :all)

# here add a room and a peer using functions from `Jellyfish.Room` module
# you should receive a notification after the peer established connection
Expand All @@ -28,14 +44,16 @@ defmodule Jellyfish.Notifier do
alias Jellyfish.ServerMessage.{
Authenticated,
AuthRequest,
RoomNotFound,
RoomsState,
RoomState,
RoomStateRequest
SubscribeRequest,
SubscriptionResponse
}

alias Jellyfish.ServerMessage.SubscribeRequest.ServerNotification
alias Jellyfish.ServerMessage.SubscriptionResponse.{RoomNotFound, RoomsState, RoomState}

@auth_timeout 2000
@subscribe_timeout 5000
@valid_events [:server_notification]

@typedoc """
The reference to the `Notifier` process.
Expand All @@ -57,7 +75,7 @@ defmodule Jellyfish.Notifier do
@doc """
Starts the Notifier process and connects to Jellyfish.

To learn how to receive notifications, see `subscribe/2`.
To learn how to receive notifications, see `subscribe/3`.

For information about options, see `t:Jellyfish.Client.connection_options/0`.
"""
Expand All @@ -67,58 +85,71 @@ defmodule Jellyfish.Notifier do
end

@doc """
Subscribes the process to receive server notifications about room with `room_id` and returns
Subscribes the process to receive events of `event_type` from room with `room_id` and returns
current state of the room.

Currently supported event is `:server_notification`.

If `:all` is passed in place of `room_id`, notifications about all of the rooms will be sent and
list of all of the room's states is returned.

Notifications are sent to the process in a form of `{:jellyfish, msg}`,
where `msg` is one of structs defined under "Notifications" section in the docs,
for example `{:jellyfish, %Jellyfish.Notification.RoomCrashed{room_id: "some_id"}}`.
"""
@spec subscribe(notifier(), Room.id() | :all) :: {:ok, Room.t() | [Room.t()]} | {:error, atom()}
def subscribe(notifier, room_id) do
WebSockex.cast(notifier, {:subscribe, self(), room_id})
@spec subscribe(notifier(), :server_notification, Room.id() | :all) ::
LVala marked this conversation as resolved.
Show resolved Hide resolved
{:ok, Room.t() | [Room.t()]} | {:error, atom()}
def subscribe(notifier, event_type, room_id) when event_type in @valid_events do
WebSockex.cast(notifier, {:subscribe, self(), event_type, room_id})

receive do
{:jellyfish, {:subscribe_answer, answer}} -> answer
{:error, _reason} = error -> error
LVala marked this conversation as resolved.
Show resolved Hide resolved
after
@subscribe_timeout -> {:error, :timeout}
end
end

@impl true
def handle_frame({:binary, msg}, state) do
%ServerMessage{content: {_type, notification}} = ServerMessage.decode(msg)
state = handle_notification(notification, state)

{:ok, state}
def subscribe(_notifier, _event_type, _room_id) do
{:error, :invalid_event_type}
end

@impl true
def handle_cast({:subscribe, pid, room_id}, state) do
# we use simple FIFO queue to keep track of different
# processes wanting to subscribe to the same room's notifications
# is assumes that the WebSocket ensures transport order as well as
# the Jellyfish ensures processing order
state =
update_in(state.pending_subscriptions[room_id], fn
nil -> [pid]
pids -> [pid | pids]
end)

room_request =
def handle_cast({:subscribe, pid, :server_notification, room_id}, state) do
proto_room_id =
case room_id do
:all -> {:option, :ALL}
:all -> {:option, :OPTION_ALL}
id -> {:id, id}
end

msg =
%ServerMessage{content: {:room_state_request, %RoomStateRequest{content: room_request}}}
request_id = UUID.uuid4()

request =
%ServerMessage{
content:
{:subscribe_request,
%SubscribeRequest{
id: request_id,
event_type:
{:server_notification,
%ServerNotification{
room_id: proto_room_id
}}
}}
}
|> ServerMessage.encode()

{:reply, {:binary, msg}, state}
state = put_in(state.pending_subscriptions[request_id], pid)

{:reply, {:binary, request}, state}
end

@impl true
def handle_frame({:binary, msg}, state) do
%ServerMessage{content: {_type, notification}} = ServerMessage.decode(msg)
state = handle_notification(notification, state)

{:ok, state}
end

@impl true
Expand Down Expand Up @@ -155,18 +186,18 @@ defmodule Jellyfish.Notifier do
%ServerMessage{content: {:auth_request, %AuthRequest{token: api_token}}}
|> ServerMessage.encode()

with {:ok, pid} <-
with {:ok, ws} <-
apply(WebSockex, fun, ["#{address}/socket/server/websocket", __MODULE__, state]),
:ok <- WebSockex.send_frame(pid, {:binary, auth_msg}) do
:ok <- WebSockex.send_frame(ws, {:binary, auth_msg}) do
receive do
{:jellyfish, :authenticated} ->
{:ok, pid}
{:ok, ws}

{:jellyfish, :invalid_token} ->
{:error, :invalid_token}
after
@auth_timeout ->
Process.exit(pid, :normal)
Process.exit(ws, :normal)
{:error, :authentication_timeout}
end
else
Expand All @@ -180,24 +211,35 @@ defmodule Jellyfish.Notifier do
state
end

defp handle_notification(%RoomNotFound{id: id}, state) do
{pid, pids} = List.pop_at(state.pending_subscriptions[id], -1)
state = put_in(state.pending_subscriptions[id], pids)
defp handle_notification(%SubscriptionResponse{id: id, content: {_type, response}}, state) do
{pid, state} = pop_in(state.pending_subscriptions[id])

handle_subscription_response(pid, response, state)
end

defp handle_notification(%{room_id: room_id} = message, state) do
state.subscriptions
|> Map.take([:all, room_id])
|> Map.values()
|> Enum.reduce(fn pids, acc -> MapSet.union(pids, acc) end)
|> Enum.each(&send(&1, {:jellyfish, Notification.to_notification(message)}))

state
end

defp handle_subscription_response(pid, %RoomNotFound{}, state) do
send(pid, {:jellyfish, {:subscribe_answer, {:error, :room_not_found}}})
state
end

defp handle_notification(%mod{} = room, state) when mod in [RoomState, RoomsState] do
defp handle_subscription_response(pid, %mod{} = room, state)
when mod in [RoomState, RoomsState] do
{room_id, room} =
case mod do
RoomState -> {room.id, Room.from_proto(room)}
RoomsState -> {:all, Enum.map(room.rooms, &Room.from_proto/1)}
end

{pid, pids} = List.pop_at(state.pending_subscriptions[room_id], -1)
state = put_in(state.pending_subscriptions[room_id], pids)

Process.monitor(pid)

state =
Expand All @@ -209,14 +251,4 @@ defmodule Jellyfish.Notifier do
send(pid, {:jellyfish, {:subscribe_answer, {:ok, room}}})
state
end

defp handle_notification(%{room_id: room_id} = message, state) do
state.subscriptions
|> Map.take([:all, room_id])
|> Map.values()
|> Enum.reduce(fn pids, acc -> MapSet.union(pids, acc) end)
|> Enum.each(&send(&1, {:jellyfish, Notification.to_notification(message)}))

state
end
end
8 changes: 4 additions & 4 deletions lib/jellyfish/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Jellyfish.Peer do

alias Jellyfish.Exception.StructureError
alias Jellyfish.Peer.WebRTC
alias Jellyfish.ServerMessage.RoomState
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState

@enforce_keys [
:id,
Expand Down Expand Up @@ -98,8 +98,8 @@ defmodule Jellyfish.Peer do
def status_from_string(status) when status in @valid_status_strings,
do: String.to_atom(status)

defp type_from_proto(:WEBRTC), do: WebRTC
defp type_from_proto(:TYPE_WEBRTC), do: WebRTC

defp status_from_proto(:CONNECTED), do: :connected
defp status_from_proto(:DISCONNECTED), do: :disconnected
defp status_from_proto(:STATUS_CONNECTED), do: :connected
defp status_from_proto(:STATUS_DISCONNECTED), do: :disconnected
end
2 changes: 1 addition & 1 deletion lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Jellyfish.Room do
alias Tesla.Env
alias Jellyfish.{Client, Component, Peer}
alias Jellyfish.Exception.StructureError
alias Jellyfish.ServerMessage.RoomState
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState

@enforce_keys [
:id,
Expand Down
Loading