Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't track socket disconnect in __absinthe__:control channel #39

Open
makefunstuff opened this issue Aug 27, 2018 · 13 comments
Open

Can't track socket disconnect in __absinthe__:control channel #39

makefunstuff opened this issue Aug 27, 2018 · 13 comments

Comments

@makefunstuff
Copy link

Hello there!
I have the following problem: I have customer facing site and backoffice and I would like to show online status in backoffice using graphql subscriptions. Let’s assume that I don’t have direct access to Phoenix channels and for me it’s not possible to use channels at all, I have only absinthe subscriptions. How can I detect that socket connection is closed for a specific user in absinthe subscription level.
OFC that is easy to do in plain phoenix channels but when it comes to absinthe there I couldn’t find any proper way to do this. Maybe I’m doing something wrong or it was overlooked in absinthe:control channel implementation?

@makefunstuff makefunstuff changed the title How to track socket disconnect in __absinthe__:control channel? Can't track socket disconnect in __absinthe__:control channel? Aug 27, 2018
@makefunstuff makefunstuff changed the title Can't track socket disconnect in __absinthe__:control channel? Can't track socket disconnect in __absinthe__:control channel Aug 27, 2018
@benwilson512
Copy link
Contributor

Hey @makefunstuff there is not a way to do presence with Absinthe.Phoenix at this time, your best bet will probably be to leverage an additional phoenix channel and use normal tracking. This is definitely a feature we want to support, but we aren't even sure what the right design would be here yet given that Phoenix Presence has a very specific format in order to make its CRDT work. I'm not yet sure how we'd let someone wrap that info in GraphQL subscriptions or similar.

@makefunstuff
Copy link
Author

Thanks for the response, I think in my case the way to go is to build a decorator module on top of existing absinthe socket and add missing connection tracking functionality. Or I can try to prepare pr and add this to existing absinthe:control channel. What do you think?

@titzi
Copy link

titzi commented Feb 14, 2019

@makefunstuff How did you end up implementing it? I have a similar thing to solve and kind of lost right now.

@makefunstuff
Copy link
Author

You can define your own both socket and channel decorator modules, where you can track presence manually. At least that's how I did this.

@elixir4ever
Copy link

elixir4ever commented Jun 4, 2019

@makefunstuff Can you share how you implemented both socket and channel decorator modules?
Thanks

@makefunstuff
Copy link
Author

@elixir4ever, sure:

first I have defined the following socket decorator

defmodule MyAppWeb.AbsintheSocketDecorator do
  use Phoenix.Socket

  channel("__absinthe__:*", MyAppWeb.AbsintheChannelDecorator,
    assigns: %{
      __absinthe_schema__: MyAppWeb.Graphql.MySchemaModule
      __absinthe_pipeline__: nil
    }
  )
  def connect(params, socket) do
     # your connect logic
  end

  def id(_socket) do
    # your id logic
  end

  defdelegate put_options(socket, opts), to: Absinthe.Phoenix.Socket

  defdelegate put_schema(socket, schema), to: Absinthe.Phoenix.Socket
end

Then you should define channel decorator

defmodule MyAppWeb.AbsintheChannelDecorator do
  def join(topic, msg, socket) do
     # your join logic where you can tack join event
  end

   def terminate(reason, socket) do
      # your disconnect tracking logic
      {:ok, socket}
   end

  defdelegate handle_in(event, msg, arg2), to: Absinthe.Phoenix.Channel

  defdelegate default_pipeline(schema, options), to: Absinthe.Phoenix.Channel
end

add socket decorator in your endpoint ex:

  socket("/app", MyAppWeb.AbsintheSocketDecorator)

@elixir4ever
Copy link

@makefunstuff Thank you. This is really helpful.

@jonsgreen
Copy link

@benwilson512 I am working with a problem that might employ a similar solution. We are developing an api and want to use subscriptions as an alternative to webhooks. The client will execute a subscription to get updates when the results of an asynchronous process is complete and we will then publish the results to the topic channel when ready.

However, I am trying to figure out the best way to deal with the situation where a subscription gets disconnected and then rejoins after the results have been published. One solution would be to have a hook where we can publish the result when someone joins. I am thinking that using Phoenix Tracking when a the client joins would allow us to trigger a publish if the results are already in.

Is the above decorator strategy still the best approach for connecting Absinthe with a Phoenix Channel? Do you have any other suggestions for solving our problem?

@IkechukwuAKalu
Copy link

Thanks, @makefunstuff for the workaround.

In case anyone has an issue with the AbsintheChannelDecorator , don't forget to include use Phoenix.Channel

@bernardo-martinez
Copy link

bernardo-martinez commented Oct 7, 2020

Yep thanks, @makefunstuff for the workaround.

However when I try to implement it I get on the reason param: {{:badkey, :schema, ..., looks like put_schema is not been called...

I'm using absinthe 1.5 and absinthe_phoenix 1.5

@bernardo-martinez
Copy link

Yep thanks, @makefunstuff for the workaround.

However when I try to implement it I get on the reason param: {{:badkey, :schema, ..., looks like put_schema is not been called...

I'm using absinthe 1.5 and absinthe_phoenix 1.5

Ok, I've fixed it, as you said since this is a decorator, I was missing calling Absinthe.Phoenix.Channel.join(topic, msg, socket) at the end of MyAppWeb.AbsintheChannelDecorator.join like:

def join(topic, msg, socket) do
    # my logic here...
    Absinthe.Phoenix.Channel.join(topic, msg, socket)
  end

Thanks!

@autodidaddict
Copy link

What is the current preferred way of dealing with this? I'm in this exact situation where I have background processes that start when a subscription connects that I need to terminate when the subscription is no longer active. Should I still use the manually defined socket and channel decorators, or has there been more work done that isn't visible in this issue?

@tunchamroeun
Copy link

tunchamroeun commented Nov 30, 2021

I have do some customization to this file https://github.com/absinthe-graphql/absinthe_phoenix/blob/master/lib/absinthe/phoenix/channel.ex 😄

Channel Status (enter, leave and terminate)

  1. Subscribe (enter) the the channel
  2. Unsubscribe (leave) the channel
  3. Terminate (terminate) the channel
    Above notify in subscription config
defmodule Deps.Absinthe.Phoenix.Channel do
  use Phoenix.Channel
  alias Deps.Absinthe.Phoenix.Socket
  require Logger

  @moduledoc false

  @doc false
  def __using__(_) do
    raise """
    ----------------------------------------------
    You should `use Deps.Absinthe.Phoenix.Socket`
    ----------------------------------------------
    """
  end

  @doc false
  def join("__absinthe__:control", _, socket) do
    schema = socket.assigns[:__absinthe_schema__]
    pipeline = socket.assigns[:__absinthe_pipeline__]

    absinthe_config = Map.get(socket.assigns, :absinthe, %{})

    opts =
      absinthe_config
      |> Map.get(:opts, [])
      |> Keyword.update(
           :context,
           %{pubsub: socket.endpoint},
           fn context ->
             Map.put_new(context, :pubsub, socket.endpoint)
           end
         )

    absinthe_config =
      put_in(absinthe_config[:opts], opts)
      |> Map.update(:schema, schema, & &1)

    absinthe_config =
      Map.put(absinthe_config, :pipeline, pipeline || {__MODULE__, :default_pipeline})

    socket = socket
             |> assign(:absinthe, absinthe_config)
    {:ok, socket}
  end

  @doc false
  def handle_in("doc", payload, socket) do
    config = socket.assigns[:absinthe]
    with variables when is_map(variables) <- extract_variables(payload) do
      config_opts = [context: Map.merge(config.opts[:context], %{"status" => "enter", "subscriptionId" => ""})]
      opts = Keyword.put(config_opts, :variables, variables)
      query = Map.get(payload, "query", "")
      Absinthe.Logger.log_run(
        :debug,
        {
          query,
          config.schema,
          [],
          opts
        }
      )
      {reply, socket} = run_doc(socket, query, config, opts)

      Logger.debug(
        fn ->
          """
          -- Absinthe Phoenix Reply --
          #{inspect(reply)}
          ----------------------------
          """
        end
      )
      if reply != :noreply do
        {:ok, %{subscriptionId: subscriptionId}} = reply
        socket = subscription_status(socket, "enter", subscriptionId)
        {:reply, reply, socket}
      else
        {:noreply, socket}
      end
    else
      _ -> {:reply, {:error, %{error: "Could not parse variables as map"}}, socket}
    end
  end

  def handle_in("unsubscribe", %{"subscriptionId" => doc_id}, socket) do
    socket = subscription_status(socket, "leave", doc_id)
    pubsub =
      socket.assigns
      |> Map.get(:absinthe, %{})
      |> Map.get(:opts, [])
      |> Keyword.get(:context, %{})
      |> Map.get(:pubsub, socket.endpoint)

    Phoenix.PubSub.unsubscribe(socket.pubsub_server, doc_id)
    Absinthe.Subscription.unsubscribe(pubsub, doc_id)
    {:reply, {:ok, %{subscriptionId: doc_id}}, socket}
  end
  defp run_doc(socket, query, config, opts) do
    case run(query, config[:schema], config[:pipeline], opts) do
      {:ok, %{"subscribed" => topic}, context} ->
        %{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server} =
          socket
        :ok =
          Phoenix.PubSub.subscribe(
            pubsub_server,
            topic,
            metadata: {:fastlane, transport_pid, serializer, []},
            link: true
          )
        # add payload when needed in unsubscribe
        socket = Socket.put_options(
          socket,
          context: Map.merge(
            context,
            %{
              "payload" => %{
                "query" => query
              }
            }
          )
        )
        {{:ok, %{subscriptionId: topic}}, socket}

      {:ok, %{data: _} = reply, context} ->
        socket = Socket.put_options(socket, context: context)
        {{:ok, reply}, socket}

      {:ok, %{errors: _} = reply, context} ->
        socket = Socket.put_options(socket, context: context)
        {{:error, reply}, socket}

      {:error, reply} ->
        {reply, socket}
    end
  end

  defp run(document, schema, pipeline, options) do
    {module, fun} = pipeline

    case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do
      {:ok, %{result: result, execution: res}, _phases} ->
        {:ok, result, res.context}

      {:error, msg, _phases} ->
        {:error, msg}
    end
  end

  defp extract_variables(payload) do
    case Map.get(payload, "variables", %{}) do
      nil -> %{}
      map -> map
    end
  end

  @doc false
  def default_pipeline(schema, options) do
    schema
    |> Absinthe.Pipeline.for_document(options)
  end
  def terminate(reason, socket)do
    case reason do
      {:shutdown, :closed} ->
        subscription_id = socket.assigns.absinthe.opts[:context]
                          |> Map.get("subscriptionId")
        socket = subscription_status(socket, "terminate", subscription_id)
        {:noreply, socket}
      _ -> IO.puts "nothing"
    end
  end
  def handle_info(_, state) do
    {:noreply, state}
  end
  defp subscription_status(socket, status, subscription_id) do
    # status (enter, leave and terminate)
    config = socket.assigns[:absinthe]
    Map.get(socket.assigns.absinthe.opts[:context], "payload", "")
    |> case  do
         payload when payload != "" -> with variables when is_map(variables) <- extract_variables(payload) do
                                         query = Map.get(payload, "query", "")
                                         config_opts = [
                                           context: Map.merge(
                                             config.opts[:context],
                                             %{"status" => status, "subscriptionId" => subscription_id}
                                           )
                                         ]
                                         opts = Keyword.put(config_opts, :variables, variables)
                                         context = socket.assigns.absinthe.opts[:context];
                                         run(query, config[:schema], config[:pipeline], opts)
                                         Socket.put_options(
                                           socket,
                                           context: Map.merge(
                                             context,
                                             %{
                                               "subscriptionId" => subscription_id
                                             }
                                           )
                                         )
                                       end
         _ -> socket
       end
  end
end

My Subscription Config

Three status will run here

field :update_expiration_msg, :string do
      arg(:quiz_template_grouping_id, non_null(:id))
      config(
        fn args, %{
          context: %{
            "status" => status,
            "subscriptionId" => subscription_id
          }
        } ->
          IO.puts("Channel Status")
          IO.inspect status
          IO.puts("Subscription Id")
          IO.inspect subscription_id
          if(status == "enter" and subscription_id != "")do
            add_schedule_job(subscription_id, args)
          end
          if((status == "leave" || status == "terminate") and subscription_id != "")do
            remove_schedule_job(subscription_id)
          end
          {:ok, topic: "#{args.quiz_template_grouping_id}"}
        end
      )
    end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants