Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on-closed not fired when client closes connection #375

Closed
malcolmsparks opened this issue Apr 3, 2018 · 8 comments
Closed

on-closed not fired when client closes connection #375

malcolmsparks opened this issue Apr 3, 2018 · 8 comments

Comments

@malcolmsparks
Copy link
Contributor

My context is that I'm trying to implement GraphQL subscriptions for Lacinia with aleph (with WS and SSE, but that's not relevant). A GraphQL subscription event might be for, say, my age changing. That happens once a year, so is rare. Ideally, if the client gets bored of waiting for that event they should be able to close the connection which should trigger the unsubscription of the event source.

Let's consider the following request handler:

(defn handler [req]
  (let [s (ms/stream)]

    (ms/on-closed s (fn [] (println "Stream closed, so I can unsubscribe!")))

    {:status 200
     :headers {"content-type" "text/event-stream"}
     :body s}))

When I connect using curl, I see an empty event-stream, the connection remains alive (as it should) but when a Ctrl-c I don't get the expected "Stream closed, so I can unsubscribe!" message.

Let's amend the handler by adding some events:

(defn handler [req]
  (let [s (ms/stream)]

    (ms/on-closed s (fn [] (println "Stream closed!")))

    (future
      (loop [n 1]
        (when (and (< n 100)
                   @(ms/put! s (str "data: " n "\r\n\r\n")))
          (Thread/sleep 100)
          (recur (inc n)))))

    {:status 200
     :headers {"content-type" "text/event-stream"}
     :body s}))

Now, if I connect with curl, I do see the println message. To understand why, look at aleph.http.server/ring-handler.

     :channel-inactive
      ([_ ctx]
        (when-let [s @stream]
          (s/close! s))
        (doseq [b @buffer]
          (netty/release b)))

When the client closes the channel we do indeed get the :channel-inactive event. However, the implementation then derefs the stream, which blocks until the next event (which in my contrived example might be a year away!). In the case I've got bored and closed the connection myself, the deref returns nil and so the (s/close! s) body isn't executed.

So I don't get any notification to unsubscribe my subscriber. Of course, I can wait (maybe a year) until the next event is generated and try to put! on the closed channel, and unsubscribing upon it returning false. I can also live with the assumption that GraphQL events will be produced fairly frequently, but it doesn't feel at all efficient/optimal.

Is there a better way of signalling the closing client connection rather than attempting to put! on the stream? It seems that given the fact that :channel-inactive is called by Netty to aleph, there should be a way of signalling 'user-space' code.

@kachayev
Copy link
Collaborator

kachayev commented Apr 4, 2018

However, the implementation then derefs the stream, which blocks until the next event (which in my contrived example might be a year away!).

stream here is an atom that holds netty/channel wrapped into buffered-source (when ready/necessary): https://github.com/ztellman/aleph/blob/5e22db5f99f259d0b3d3f70d6eddcd4a59cd6588/src/aleph/http/server.clj#L223 initialized with the value here or here. Meaning that deref here is not blocking, it's just an access to the current state of the atom. So the code you've mentioned actually just checks: if we have a stream going, close it.

@malcolmsparks
Copy link
Contributor Author

Thanks for this info, it's very useful. My initial theory as to why I'm not seeing the on-closed callback called must be wrong. Why do you think the call back isn't called?

Here's a test case: https://github.com/juxt/aleph-issue

I'm seeing this with alpha5. I can try with older releases.

@malcolmsparks
Copy link
Contributor Author

@kachayev, thanks again. I now see that the stream is indeed an atom, which contains nil when Netty calls :channel-inactive.

:channel-inactive
      ([_ ctx]
        (when-let [s @stream]
          (s/close! s))
        (doseq [b @buffer]
          (netty/release b)))

The symbol s, in the case of ring-handler refers to the stream used for transfer-encoding: chunked support, which I mistook for my own body stream. For raw-ring-handler it refers to the incoming stream, whereas I'm interested in a notification that my outgoing stream is being closed.

I guess I need to go back and think about this some more.

@kachayev
Copy link
Collaborator

kachayev commented Apr 4, 2018

@malcolmsparks The code you're referring both in case of ring-handler and raw-ring-handler is the code responsible for reading the request. In the first case, it would convert request (even chunked) into InputStream (actually, something that looks like InputStream, but that's irrelevant for now), in the second case, it would just stream you netty's ByteBufs. But that's still the body of the request, not the response.

If you're looking on-closed to be called on the stream that represents the body of your response, you need send-message, that should call send-streaming-body, which should close the source channel here as close is properly propagated through upstream<->downstream connection here 🤔

Hope that reveals some useful insights. But I cannot reproduce the issue. Using curl and your code:

(require '[aleph.http :as http])
=> nil
(require '[manifold.stream :as s])
=> nil
(defn handler [req]
  (let [s' (s/stream)]

    (s/on-closed s' (fn [] (println "Stream closed!")))
    (s/on-drained s' (fn [] (println "Stream is drained!")))

    (future
      (loop [n 1]
        (when (and (< n 100)
                   @(s/put! s' (str "data: " n "\r\n\r\n")))
          (Thread/sleep 100)
          (recur (inc n)))))

    {:status 200
     :headers {"content-type" "text/event-stream"}
     :body s'}))

=> #'user/handler
(def server (http/start-server handler {:port 8080}))
=> #'user/server
Stream closed!
Stream is drained!

@malcolmsparks
Copy link
Contributor Author

Thanks @kachayev , I now understand the point about the request stream versus the response stream. The problem I'm trying to solve can be demonstrated if you comment out the (future ...) block in that code segment. In this case, there is no println message.

I understand that the response stream is closed upon send-message - I'm just trying to get a notification that the stream has been closed before having anything to write to it. As per my context at the beginning of this issue, I need to know that a client has closed a connection and thereby unsubscribe on an event bus prior to an event being available to attempt to send through the channel.

@kachayev
Copy link
Collaborator

kachayev commented Apr 4, 2018

Ah, I see now.

It works this way:

(defn handler [s' req]
  (s/on-closed s' (fn [] (println "Stream closed!")))
  (s/on-drained s' (fn [] (println "Stream is drained!")))
  {:status 200
   :headers {"content-type" "text/event-stream"}
   :body s'})

=> #'user/handler
(def s' (s/stream))
=> #'user/s'
(def server (http/start-server (partial handler s') {:port 8080}))
=> #'user/server
(s/put! s' "Hello")
Stream closed!
Stream is drained!
=> << true >>

So, the idea here is that closing src does not propagate to the body, but when you're trying to put something onto the channel it figures out that it should be closed (due to internal of s/connect implementation that is used here).

We could probably close body here, but that would be a breaking change 🤔And even in that case there's no reliable way to achieve what you're trying to do: on-closed will definitely introduce a race in your code, even true as the result of put! could not guarantee you that underlying channel is not closed yet (as we still have a level of indirection there).

@ztellman
Copy link
Collaborator

ztellman commented Apr 4, 2018

I think changing src to body in the linked line would be fine, and probably preferable. Manifold doesn't handle upstream propagation of closed streams right now, so performing the close! as far upstream as possible is ideal.

There is an open issue on Manifold to allow for upstream propagation, which I started poking at a while back, but didn't finish. Linking it here for cross-referencing: clj-commons/manifold#128.

@kachayev
Copy link
Collaborator

@ztellman I think it's safe now to close this issue 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants