Skip to content

Commit

Permalink
Subscribing for events (#19)
Browse files Browse the repository at this point in the history
* Subscribing for events

* Send room state in subscription

* Add type for event_type

* Update proto
  • Loading branch information
roznawsk committed Jul 12, 2023
1 parent 3565522 commit d1e267a
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 111 deletions.
6 changes: 3 additions & 3 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Jellyfish.Component do

alias Jellyfish.Component.{HLS, RTSP}
alias Jellyfish.Exception.StructureError
alias Jellyfish.ServerMessage.RoomState
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState

@enforce_keys [
:id,
Expand Down Expand Up @@ -83,6 +83,6 @@ defmodule Jellyfish.Component do
defp type_from_string("hls"), do: HLS
defp type_from_string("rtsp"), do: RTSP

defp type_from_proto(:HLS), do: HLS
defp type_from_proto(:RTSP), do: RTSP
defp type_from_proto(:TYPE_HLS), do: HLS
defp type_from_proto(:TYPE_RTSP), do: RTSP
end
147 changes: 92 additions & 55 deletions lib/jellyfish/notifier.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
defmodule Jellyfish.Notifier do
@moduledoc """
Module defining a process responsible for establishing
WebSocket connection and receiving notifications form Jellyfish server.
WebSocket connection and receiving notifications from Jellyfish server.
Define the connection configuration in the mix config
``` config.exs
config :jellyfish_server_sdk,
server_address: "localhost:5002",
server_api_token: "your-jellyfish-token",
secure?: true
```
```
iex> {:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_token: "your-jellyfish-token")
# Start the Notifier
iex> {:ok, notifier} = Jellyfish.Notifier.start()
{:ok, #PID<0.301.0>}
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :all)
# Optionally, you can provide connection options instead of using config:
{:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_token: "your-jellyfish-token")
```
```
# Subscribe current process to all server notifications.
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :server_notifications, :all)
# here add a room and a peer using functions from `Jellyfish.Room` module
# you should receive a notification after the peer established connection
Expand All @@ -28,20 +44,27 @@ defmodule Jellyfish.Notifier do
alias Jellyfish.ServerMessage.{
Authenticated,
AuthRequest,
RoomNotFound,
RoomsState,
RoomState,
RoomStateRequest
SubscribeRequest,
SubscriptionResponse
}

alias Jellyfish.ServerMessage.SubscribeRequest.ServerNotification
alias Jellyfish.ServerMessage.SubscriptionResponse.{RoomNotFound, RoomsState, RoomState}

@auth_timeout 2000
@subscribe_timeout 5000
@valid_events [:server_notification]

@typedoc """
The reference to the `Notifier` process.
"""
@type notifier() :: GenServer.server()

@typedoc """
A type of event, for which a process can subscribe using `subscribe/3`.
"""
@type event_type() :: :server_notification

@doc """
Starts the Notifier process and connects to Jellyfish.
Expand All @@ -57,7 +80,7 @@ defmodule Jellyfish.Notifier do
@doc """
Starts the Notifier process and connects to Jellyfish.
To learn how to receive notifications, see `subscribe/2`.
To learn how to receive notifications, see `subscribe/3`.
For information about options, see `t:Jellyfish.Client.connection_options/0`.
"""
Expand All @@ -67,58 +90,71 @@ defmodule Jellyfish.Notifier do
end

@doc """
Subscribes the process to receive server notifications about room with `room_id` and returns
Subscribes the process to receive events of `event_type` from room with `room_id` and returns
current state of the room.
Currently supported event is `:server_notification`.
If `:all` is passed in place of `room_id`, notifications about all of the rooms will be sent and
list of all of the room's states is returned.
Notifications are sent to the process in a form of `{:jellyfish, msg}`,
where `msg` is one of structs defined under "Notifications" section in the docs,
for example `{:jellyfish, %Jellyfish.Notification.RoomCrashed{room_id: "some_id"}}`.
"""
@spec subscribe(notifier(), Room.id() | :all) :: {:ok, Room.t() | [Room.t()]} | {:error, atom()}
def subscribe(notifier, room_id) do
WebSockex.cast(notifier, {:subscribe, self(), room_id})
@spec subscribe(notifier(), event_type(), Room.id() | :all) ::
{:ok, Room.t() | [Room.t()]} | {:error, atom()}
def subscribe(notifier, event_type, room_id) when event_type in @valid_events do
WebSockex.cast(notifier, {:subscribe, self(), event_type, room_id})

receive do
{:jellyfish, {:subscribe_answer, answer}} -> answer
{:error, _reason} = error -> error
after
@subscribe_timeout -> {:error, :timeout}
end
end

@impl true
def handle_frame({:binary, msg}, state) do
%ServerMessage{content: {_type, notification}} = ServerMessage.decode(msg)
state = handle_notification(notification, state)

{:ok, state}
def subscribe(_notifier, _event_type, _room_id) do
{:error, :invalid_event_type}
end

@impl true
def handle_cast({:subscribe, pid, room_id}, state) do
# we use simple FIFO queue to keep track of different
# processes wanting to subscribe to the same room's notifications
# is assumes that the WebSocket ensures transport order as well as
# the Jellyfish ensures processing order
state =
update_in(state.pending_subscriptions[room_id], fn
nil -> [pid]
pids -> [pid | pids]
end)

room_request =
def handle_cast({:subscribe, pid, :server_notification, room_id}, state) do
proto_room_id =
case room_id do
:all -> {:option, :ALL}
:all -> {:option, :OPTION_ALL}
id -> {:id, id}
end

msg =
%ServerMessage{content: {:room_state_request, %RoomStateRequest{content: room_request}}}
request_id = UUID.uuid4()

request =
%ServerMessage{
content:
{:subscribe_request,
%SubscribeRequest{
id: request_id,
event_type:
{:server_notification,
%ServerNotification{
room_id: proto_room_id
}}
}}
}
|> ServerMessage.encode()

{:reply, {:binary, msg}, state}
state = put_in(state.pending_subscriptions[request_id], pid)

{:reply, {:binary, request}, state}
end

@impl true
def handle_frame({:binary, msg}, state) do
%ServerMessage{content: {_type, notification}} = ServerMessage.decode(msg)
state = handle_notification(notification, state)

{:ok, state}
end

@impl true
Expand Down Expand Up @@ -155,18 +191,18 @@ defmodule Jellyfish.Notifier do
%ServerMessage{content: {:auth_request, %AuthRequest{token: api_token}}}
|> ServerMessage.encode()

with {:ok, pid} <-
with {:ok, ws} <-
apply(WebSockex, fun, ["#{address}/socket/server/websocket", __MODULE__, state]),
:ok <- WebSockex.send_frame(pid, {:binary, auth_msg}) do
:ok <- WebSockex.send_frame(ws, {:binary, auth_msg}) do
receive do
{:jellyfish, :authenticated} ->
{:ok, pid}
{:ok, ws}

{:jellyfish, :invalid_token} ->
{:error, :invalid_token}
after
@auth_timeout ->
Process.exit(pid, :normal)
Process.exit(ws, :normal)
{:error, :authentication_timeout}
end
else
Expand All @@ -180,24 +216,35 @@ defmodule Jellyfish.Notifier do
state
end

defp handle_notification(%RoomNotFound{id: id}, state) do
{pid, pids} = List.pop_at(state.pending_subscriptions[id], -1)
state = put_in(state.pending_subscriptions[id], pids)
defp handle_notification(%SubscriptionResponse{id: id, content: {_type, response}}, state) do
{pid, state} = pop_in(state.pending_subscriptions[id])

handle_subscription_response(pid, response, state)
end

defp handle_notification(%{room_id: room_id} = message, state) do
state.subscriptions
|> Map.take([:all, room_id])
|> Map.values()
|> Enum.reduce(fn pids, acc -> MapSet.union(pids, acc) end)
|> Enum.each(&send(&1, {:jellyfish, Notification.to_notification(message)}))

state
end

defp handle_subscription_response(pid, %RoomNotFound{}, state) do
send(pid, {:jellyfish, {:subscribe_answer, {:error, :room_not_found}}})
state
end

defp handle_notification(%mod{} = room, state) when mod in [RoomState, RoomsState] do
defp handle_subscription_response(pid, %mod{} = room, state)
when mod in [RoomState, RoomsState] do
{room_id, room} =
case mod do
RoomState -> {room.id, Room.from_proto(room)}
RoomsState -> {:all, Enum.map(room.rooms, &Room.from_proto/1)}
end

{pid, pids} = List.pop_at(state.pending_subscriptions[room_id], -1)
state = put_in(state.pending_subscriptions[room_id], pids)

Process.monitor(pid)

state =
Expand All @@ -209,14 +256,4 @@ defmodule Jellyfish.Notifier do
send(pid, {:jellyfish, {:subscribe_answer, {:ok, room}}})
state
end

defp handle_notification(%{room_id: room_id} = message, state) do
state.subscriptions
|> Map.take([:all, room_id])
|> Map.values()
|> Enum.reduce(fn pids, acc -> MapSet.union(pids, acc) end)
|> Enum.each(&send(&1, {:jellyfish, Notification.to_notification(message)}))

state
end
end
8 changes: 4 additions & 4 deletions lib/jellyfish/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Jellyfish.Peer do

alias Jellyfish.Exception.StructureError
alias Jellyfish.Peer.WebRTC
alias Jellyfish.ServerMessage.RoomState
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState

@enforce_keys [
:id,
Expand Down Expand Up @@ -98,8 +98,8 @@ defmodule Jellyfish.Peer do
def status_from_string(status) when status in @valid_status_strings,
do: String.to_atom(status)

defp type_from_proto(:WEBRTC), do: WebRTC
defp type_from_proto(:TYPE_WEBRTC), do: WebRTC

defp status_from_proto(:CONNECTED), do: :connected
defp status_from_proto(:DISCONNECTED), do: :disconnected
defp status_from_proto(:STATUS_CONNECTED), do: :connected
defp status_from_proto(:STATUS_DISCONNECTED), do: :disconnected
end
2 changes: 1 addition & 1 deletion lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Jellyfish.Room do
alias Tesla.Env
alias Jellyfish.{Client, Component, Peer}
alias Jellyfish.Exception.StructureError
alias Jellyfish.ServerMessage.RoomState
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState

@enforce_keys [
:id,
Expand Down
Loading

0 comments on commit d1e267a

Please sign in to comment.