Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribing for events #19

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "protos"]
path = protos
url = https://github.com/jellyfish-dev/protos.git
branch = master
branch = actually-subscribe-for-notifications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a friendly reminder

4 changes: 2 additions & 2 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ defmodule Jellyfish.Component do
defp type_from_string("hls"), do: HLS
defp type_from_string("rtsp"), do: RTSP

defp type_from_proto(:HLS), do: HLS
defp type_from_proto(:RTSP), do: RTSP
defp type_from_proto(:TYPE_HLS), do: HLS
defp type_from_proto(:TYPE_RTSP), do: RTSP
end
161 changes: 124 additions & 37 deletions lib/jellyfish/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@ defmodule Jellyfish.Notifier do
Module defining a process responsible for establishing
WebSocket connection and receiving notifications form Jellyfish server.
sgfn marked this conversation as resolved.
Show resolved Hide resolved

Define the connection configuration
``` config.exs
config :jellyfish_server_sdk,
server_address: "localhost:5002",
server_api_token: "your-jellyfish-token",
secure?: true
```
iex> {:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_token: "your-jellyfish-token")

Optionally, you can provide it using the options:
```
{:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_token: "your-jellyfish-token")
```

```
# Start the Notifier, specyfing what types of events it will receive from the Jellyfish
iex> {:ok, notifier} = Jellyfish.Notifier.start(events: [:server_notifications])
{:ok, #PID<0.301.0>}
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :all)

# Subscribe current process to all server notifications.
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :server_notifications, :all)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that you have to specify that it's :server_notifications seems very redundant, this needs to be changed IMO. I think the SubscribeRequest message should be send with subscribe, and the response would be the state of the rooms (or something else if the type of notifications is metrics etc.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the current solution. We only send SubscribeRequest once, immediately after starting the notifier. For now I see a scenario, in which we define Notifier inside Application and pass the events we want to receive inside start options.
I don't see a scenario, when we would need to subscribe for some new event late after starting the notifier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether specyfing :server_notifications is redundant - if Notifier is receiving more than one type of event, we have to specify what kind of events we want the calling process to subscribe to. I think the current way to do it (passing an atom representing certain event type) is the easiest from the user perspective.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, right now starting the Notifier automatically subscribes us to the events passed as the options -- if we keep it this way, we should include a comment about it

At the moment, I feel like it's easy to confuse 1) sending a subscribe request to Jellyfish with 2) subscribing to receive events from the Notifier -- I think we should be more explicit about the distinction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per Łukasz comment, it has been unified now - calling Notifier.subscribe will every time cause sending a SubscribeRequest message to Jellyfish.


# here add a room and a peer using functions from `Jellyfish.Room` module
# you should receive a notification after the peer established connection
Expand All @@ -22,7 +38,7 @@ defmodule Jellyfish.Notifier do

use WebSockex

alias Jellyfish.{Client, Room, Utils}
alias Jellyfish.{Room, Utils}
alias Jellyfish.{Notification, ServerMessage}

alias Jellyfish.ServerMessage.{
Expand All @@ -31,60 +47,91 @@ defmodule Jellyfish.Notifier do
RoomNotFound,
RoomsState,
RoomState,
RoomStateRequest
RoomStateRequest,
SubscribeRequest,
SubscriptionResponse
}

@auth_timeout 2000
@response_timeout 2000
@subscribe_timeout 5000

@typedoc """
The reference to the `Notifier` process.
"""
@type notifier() :: GenServer.server()

@typedoc """
The connection options used to open connection to Jellyfish, as well as `events` which the
Notifier will be receiving from the Jellyfish.

For more information about connecting to Jellyfish, see `t:Jellyfish.Client.connection_options/0`.
"""
@type options() :: [
server_address: String.t(),
server_api_token: String.t(),
secure?: boolean(),
events: [:server_notification]
]
@valid_events [:server_notification]

@doc """
Starts the Notifier process and connects to Jellyfish.

Acts like `start/1` but links to the calling process.

See `start/1` for more information.
"""
@spec start_link(Client.connection_options()) :: {:ok, pid()} | {:error, term()}
@spec start_link(options()) :: {:ok, pid()} | {:error, term()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I said, I think that this function shouldn't need to specify the events.

def start_link(opts \\ []) do
connect(:start_link, opts)
end

@doc """
Starts the Notifier process and connects to Jellyfish.
All event types, which the notifier should receive have to be provided using the `events` option.

To learn how to receive notifications, see `subscribe/2`.
To learn how to receive notifications, see `subscribe/3`.

For information about options, see `t:Jellyfish.Client.connection_options/0`.
For information about options, see `t:options/0`.
"""
@spec start(Client.connection_options()) :: {:ok, pid()} | {:error, term()}
@spec start(options()) :: {:ok, pid()} | {:error, term()}
def start(opts \\ []) do
connect(:start, opts)
end

@doc """
Subscribes the process to receive server notifications about room with `room_id` and returns
Subscribes the process to receive events of `event_type` from room with `room_id` and returns
current state of the room.


Note, that in order to receive a certain type of notifications, they have to be provided when starting the notifier.
Subscribing to an `event_type` which hasn't been defined upon start results in `{:error, unsupported_event_type}`.

If `:all` is passed in place of `room_id`, notifications about all of the rooms will be sent and
list of all of the room's states is returned.

Notifications are sent to the process in a form of `{:jellyfish, msg}`,
where `msg` is one of structs defined under "Notifications" section in the docs,
for example `{:jellyfish, %Jellyfish.Notification.RoomCrashed{room_id: "some_id"}}`.
"""
@spec subscribe(notifier(), Room.id() | :all) :: {:ok, Room.t() | [Room.t()]} | {:error, atom()}
def subscribe(notifier, room_id) do
WebSockex.cast(notifier, {:subscribe, self(), room_id})

receive do
{:jellyfish, {:subscribe_answer, answer}} -> answer
after
@subscribe_timeout -> {:error, :timeout}
@spec subscribe(notifier(), :server_notification, Room.id() | :all) ::
LVala marked this conversation as resolved.
Show resolved Hide resolved
{:ok, Room.t() | [Room.t()]} | {:error, atom()}
def subscribe(notifier, event_type, room_id) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we'll use this function to subscribe to other event types as well, and since the different requests may or may not contain different options, I'd say we might want to implement a solution like in Jellyfish.Room.add_component -- define separate modules with structs for different event types, as in Jellyfish.Component.{HLS, RTSP}, and allow the user to either pass the struct name (which means we'll use default options) or the struct with custom configuration.

with true <- event_type in @valid_events do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def subscribe(notifier, event_type, room_id) do
with true <- event_type in @valid_events do
def subscribe(notifier, event_type, room_id) when event_type in @valid_types do

WebSockex.cast(notifier, {:subscribe, self(), event_type, room_id})

receive do
{:jellyfish, {:subscribe_answer, answer}} ->
answer

{:error, _reason} = error ->
error
after
@subscribe_timeout -> {:error, :timeout}
end
else
false ->
{:error, :invalid_event_type}
end
end

Expand All @@ -97,7 +144,16 @@ defmodule Jellyfish.Notifier do
end

@impl true
def handle_cast({:subscribe, pid, room_id}, state) do
def handle_cast({:subscribe, pid, event_type, _room_id} = request, state) do
if to_proto_event_type(event_type) in state.subscribed_events do
handle_request(request, state)
else
send(pid, {:error, :unsupported_event_type})
{:ok, state}
end
end

defp handle_request({:subscribe, pid, :server_notification, room_id}, state) do
# we use simple FIFO queue to keep track of different
# processes wanting to subscribe to the same room's notifications
# is assumes that the WebSocket ensures transport order as well as
Expand All @@ -110,7 +166,7 @@ defmodule Jellyfish.Notifier do

room_request =
case room_id do
:all -> {:option, :ALL}
:all -> {:option, :OPTION_ALL}
id -> {:id, id}
end

Expand Down Expand Up @@ -148,30 +204,54 @@ defmodule Jellyfish.Notifier do
state = %{
caller_pid: self(),
subscriptions: %{all: MapSet.new()},
pending_subscriptions: %{}
pending_subscriptions: %{},
subscribed_events: []
}

auth_msg =
%ServerMessage{content: {:auth_request, %AuthRequest{token: api_token}}}
with {:ok, events} <- Keyword.get(opts, :events, [:server_notification]) |> validate_events(),
sgfn marked this conversation as resolved.
Show resolved Hide resolved
state = %{state | subscribed_events: events},
{:ok, ws} <-
apply(WebSockex, fun, ["#{address}/socket/server/websocket", __MODULE__, state]),
{:jellyfish, :authenticated} <-
send_request(ws, {:auth_request, %AuthRequest{token: api_token}}),
{:jellyfish, :subscribed} <-
send_request(ws, {:subscribe_request, %SubscribeRequest{event_types: events}}) do
{:ok, ws}
else
{:jellyfish, :invalid_token} ->
{:error, :invalid_token}

{:error, _reason} = error ->
error
end
end

defp validate_events(events) do
with true <- Enum.all?(events, fn event -> event in @valid_events end) do
events
|> Enum.uniq()
|> Enum.map(&to_proto_event_type/1)
|> then(&{:ok, &1})
else
false ->
{:error, :invalid_event_type}
end
sgfn marked this conversation as resolved.
Show resolved Hide resolved
end

defp send_request(ws, content) do
request =
%ServerMessage{content: content}
|> ServerMessage.encode()

with {:ok, pid} <-
apply(WebSockex, fun, ["#{address}/socket/server/websocket", __MODULE__, state]),
:ok <- WebSockex.send_frame(pid, {:binary, auth_msg}) do
with :ok <- WebSockex.send_frame(ws, {:binary, request}) do
receive do
{:jellyfish, :authenticated} ->
{:ok, pid}

{:jellyfish, :invalid_token} ->
{:error, :invalid_token}
response ->
response
after
@auth_timeout ->
Process.exit(pid, :normal)
{:error, :authentication_timeout}
@response_timeout ->
Process.exit(ws, :normal)
{:error, :timeout}
end
else
{:error, _reason} = error ->
error
end
end

Expand All @@ -180,6 +260,11 @@ defmodule Jellyfish.Notifier do
state
end

defp handle_notification(%SubscriptionResponse{}, state) do
send(state.caller_pid, {:jellyfish, :subscribed})
state
end

defp handle_notification(%RoomNotFound{id: id}, state) do
{pid, pids} = List.pop_at(state.pending_subscriptions[id], -1)
state = put_in(state.pending_subscriptions[id], pids)
Expand Down Expand Up @@ -219,4 +304,6 @@ defmodule Jellyfish.Notifier do

state
end

defp to_proto_event_type(:server_notification), do: :EVENT_TYPE_SERVER_NOTIFICATION
end
6 changes: 3 additions & 3 deletions lib/jellyfish/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ defmodule Jellyfish.Peer do
def status_from_string(status) when status in @valid_status_strings,
do: String.to_atom(status)

defp type_from_proto(:WEBRTC), do: WebRTC
defp type_from_proto(:TYPE_WEBRTC), do: WebRTC

defp status_from_proto(:CONNECTED), do: :connected
defp status_from_proto(:DISCONNECTED), do: :disconnected
defp status_from_proto(:STATUS_CONNECTED), do: :connected
defp status_from_proto(:STATUS_DISCONNECTED), do: :disconnected
end
53 changes: 47 additions & 6 deletions lib/protos/jellyfish/server_notifications.pb.ex
Original file line number Diff line number Diff line change
@@ -1,35 +1,48 @@
defmodule Jellyfish.ServerMessage.EventType do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :EVENT_TYPE_UNSPECIFIED, 0
field :EVENT_TYPE_SERVER_NOTIFICATION, 1
end

defmodule Jellyfish.ServerMessage.RoomStateRequest.Option do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :ALL, 0
field :OPTION_UNSPECIFIED, 0
field :OPTION_ALL, 1
end

defmodule Jellyfish.ServerMessage.RoomState.Peer.Type do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :WEBRTC, 0
field :TYPE_UNSPECIFIED, 0
field :TYPE_WEBRTC, 1
end

defmodule Jellyfish.ServerMessage.RoomState.Peer.Status do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :CONNECTED, 0
field :DISCONNECTED, 1
field :STATUS_UNSPECIFIED, 0
field :STATUS_CONNECTED, 1
field :STATUS_DISCONNECTED, 2
end

defmodule Jellyfish.ServerMessage.RoomState.Component.Type do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :HLS, 0
field :RTSP, 1
field :TYPE_UNSPECIFIED, 0
field :TYPE_HLS, 1
field :TYPE_RTSP, 2
end

defmodule Jellyfish.ServerMessage.RoomCrashed do
Expand Down Expand Up @@ -155,6 +168,24 @@ defmodule Jellyfish.ServerMessage.RoomNotFound do
field :id, 1, type: :string
end

defmodule Jellyfish.ServerMessage.SubscribeRequest do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :event_types, 1,
repeated: true,
type: Jellyfish.ServerMessage.EventType,
json_name: "eventTypes",
enum: true
end

defmodule Jellyfish.ServerMessage.SubscriptionResponse do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3
end

defmodule Jellyfish.ServerMessage do
@moduledoc false

Expand Down Expand Up @@ -210,4 +241,14 @@ defmodule Jellyfish.ServerMessage do
type: Jellyfish.ServerMessage.RoomNotFound,
json_name: "roomNotFound",
oneof: 0

field :subscribe_request, 12,
type: Jellyfish.ServerMessage.SubscribeRequest,
json_name: "subscribeRequest",
oneof: 0

field :subscription_response, 13,
type: Jellyfish.ServerMessage.SubscriptionResponse,
json_name: "subscriptionResponse",
oneof: 0
end
2 changes: 1 addition & 1 deletion protos
Loading