Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event streaming from api-server #119

Closed
TimoKramer opened this issue Mar 6, 2023 · 14 comments
Closed

Event streaming from api-server #119

TimoKramer opened this issue Mar 6, 2023 · 14 comments

Comments

@TimoKramer
Copy link
Member

For a dynamic web interface for Bob we need it to update based on some kind of event stream. The event stream should be provided by the apiserver since that is the point we are connecting to from outside. Updates on the status of pipelines should be streamed to all the consumers that are connected to the events-endpoint.

We could utilize server-sent-events for this or websockets. sse is the more lightweight approach and provides one-way communication what is what we need. There is a ring-middleware for it that seems to be all we need. websockets are also a widely used option and seem to be an adequate option as well.

We could also make the /events-endpoint static and dynamic. For streaming events the consumer could send a GET-request with parameter like this ?stream=true. For manual requests we could offer some functionality like docker offers with its events-endpoint, namely since until and filter.

@lispyclouds
Copy link
Member

Looks good! The one thing I'd consider is ease of access from the command line tooling as well apart from the Web UI.

@TimoKramer
Copy link
Member Author

Ok, I took a look at xtdb's listen and it didn't look like that can stream :status changes for us. So basically we would have to implement our own events.

@TimoKramer
Copy link
Member Author

Or we could use mulog?!

@aldosolorzano
Copy link
Contributor

I found https://mercure.rocks/ as an interesting thing for this problem

@TimoKramer
Copy link
Member Author

TimoKramer commented May 5, 2023

Apiserver needs to consume events for an events-endpoint static request with no updates use postgres use rabbitmq streams
deliver state xt/q next/execute! environment.consumerBuilder()
event streaming query repeatedly client-side LISTEN events environment.consumerBuilder()
consistency always consistent there might be lost events between SELECT and LISTEN should be consistent
effort lowest effort configure, connect and read/write postgres; problem when xtdb-backend is switched 😟 java interop with streams-client; install streams-plugin 😐
performance expectation bad since whole datasets are queried repeatedly supposedly good because PG ☺️ supposedly best because that's what it's made for 😃
quirks none, but not really a solution 😑 FIFO queueing and LISTEN are more esoteric PG features 😑 seems to be the most straightforward solution

@lispyclouds
Copy link
Member

Another thing to keep in mind is that we should start subscribing to stream when the request comes into /events. We should not subscribe before as that would start accumulation on the api servers making them stateful.
Also we should check if RabbitMQ supports consumer groups. All of the APIServers form a single consumer and the read state should be treated like that.

@coro
Copy link

coro commented May 12, 2023

Rabbit's equivalent of consumer groups is the Single Active Consumer pattern for streams. If I understand your use case correctly, you'd want all of your APIServer consumers to have this enabled so that only one of them consumes from the stream at a time.

@lispyclouds
Copy link
Member

lispyclouds commented May 13, 2023

@TimoKramer Just to kick the tires, tried this piece of code:

(ns rabbit.main
  (:import
   [com.rabbitmq.stream ConfirmationHandler Environment MessageHandler]))

(def stream "my-events")

(def environment (.. Environment builder build))

(def producer (.. environment
                  producerBuilder
                  (stream stream)
                  (name "a-producer")
                  build))

(def consumer (.. environment
                  consumerBuilder
                  (stream stream)
                  (name "a-consumer")
                  singleActiveConsumer
                  (messageHandler (reify MessageHandler
                                    (handle [_ context message]
                                      (println "This is a context:" context)
                                      (println "This is a message:" (String. (.getBodyAsBinary message))))))
                  build))

(defn publish
  [producer ^String message]
  (let [message (.. producer
                    messageBuilder
                    (addData (.getBytes message))
                    build)]
    (.send producer message (reify ConfirmationHandler
                              (handle [_ _])))))

(comment
  (set! *warn-on-reflection* true)

  ;; create the stream
  (.. environment
      streamCreator
      (stream stream)
      create)

  (publish producer "bar")

  (publish producer "foo")

  (.close producer)

  (.close consumer))

seems to do quite well and should serve the needs. Leaving this as an example.
Used this to start a rabbit instance for the test:

$ podman run -it --name rmq --rm -p 5552:5552 -p 5672:5672 -p 15672:15672 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' rabbitmq:management-alpine

$ podman exec rmq rabbitmq-plugins enable rabbitmq_stream

@TimoKramer
Copy link
Member Author

TimoKramer commented May 14, 2023

Rabbit's equivalent of consumer groups is the Single Active Consumer pattern for streams. If I understand your use case correctly, you'd want all of your APIServer consumers to have this enabled so that only one of them consumes from the stream at a time.

@coro I need all the apiservers to subscribe to a stream and all should receive all messages. Single active consumer seems to only send to the 'active' one and seems to be not the right fit as I see it. Will read the docs more closely now. Thanks! 👍

@lispyclouds
Copy link
Member

On further thinking I think we should NOT use the single active consumer:

  • We have multiple load balanced API servers and any one could service the /events request.
  • Servicing a stream via one of the servers makes that one stateful and if thats the single active one, subsequent requests landing on the inactive ones wont answer anything.
  • We need all of them to get the messages, but _only start streaming when the GET /events comes.
  • We set the stream to have a max capacity: either temporal or size, not sure and it keeps cleaning the old ones up from the beginning.
  • Each of the api servers start to subscribe from the first and not the last read position every time and stream the contents out. That way there is no read tracking and the api servers are stateless

Will think more, lemme know what y'all think of this?

@lispyclouds
Copy link
Member

lispyclouds commented May 14, 2023

Here are two ring handlers that implements a producer and consumer and sends SSE:

(require '[ring.core.protocols :as p])

(defn publish
  [{{{:keys [message]} :path} :parameters}]
  (let [msg (.. producer
                messageBuilder
                (addData (.getBytes message))
                build)]
    (.send producer msg (reify ConfirmationHandler
                          (handle [_ _])))
    {:status 200
     :body {:message "Ok"}}))

(defn events
  [_]
  {:status 200
   :headers {"content-type" "text/event-stream"
             "transfer-encoding" "chunked"}
   :body (reify p/StreamableResponseBody
           (write-body-to-stream [_ _ output-stream]
             (with-open [w (io/writer output-stream)]
               (let [complete (promise)
                     consumer (.. environment
                                  consumerBuilder
                                  (stream stream-name)
                                  (messageHandler (reify MessageHandler
                                                    (handle [_ _ message]
                                                      (try
                                                        (doto w
                                                          ;; SSE format: data: foo\n\n
                                                          (.write (str "data: " (String. (.getBodyAsBinary message)) "\n\n"))
                                                          (.flush))
                                                        (catch Exception _
                                                          (println "client disconnected")
                                                          (deliver complete :done)))))) ;; unblock
                                  build)]
                 @complete ;; block til done
                 (.close consumer)
                 (.close output-stream)))))})

the first handler handles: POST /publish/foo
the second one handles GET /events. eg: curl http://localhost:7777/events

One or more clients can make the GET /events call and they all should start seeing the events from the beginning.

The consumer implementation is a bit convoluted with the way Jetty/Ring streams work, open to better ideas! The weird promise is there to cleanup the consumer. Couldn't find a way to check if the stream is closed to know when the client has disconnected. Catching the exception when the write fails and signalling seems to be the best idea i can think of now.

@lispyclouds
Copy link
Member

lispyclouds commented May 15, 2023

Another observation: most browsers would limit the total concurrent persistent HTTP connections to the same domain to some low number. 6 for HTTP 1.1 and 100 for HTTP 2/3 afaik. We can keep this in mind if this starts being a problem with things like multiple tabs. This isn't an issue in non browser clients.

@TimoKramer
Copy link
Member Author

Rebuilt the PR with rabbitmq stream and SSE.

TimoKramer added a commit to TimoKramer/bob that referenced this issue Jun 1, 2023
This commit adds streaming of events from runners to
the apiservers via RabbitMQ streams and streaming of
events from apiservers to connected clients via SSE.

This was necessary to enable building dynamic
frontends that can show updates on pipelines without
reloading. Please see the ADR for background info
and the issue bob-cd#119 and PR bob-cd#126.

- Closes bob-cd#119
TimoKramer added a commit to TimoKramer/bob that referenced this issue Jun 2, 2023
This commit adds streaming of events from runners to
the apiservers via RabbitMQ streams and streaming of
events from apiservers to connected clients via SSE.

This was necessary to enable building dynamic
frontends that can show updates on pipelines without
reloading. Please see the ADR for background info
and the issue bob-cd#119 and PR bob-cd#126.

- Closes bob-cd#119
lispyclouds pushed a commit that referenced this issue Jun 5, 2023
@lispyclouds
Copy link
Member

implemented in efddf35

lispyclouds added a commit to bob-cd/bob-deploy that referenced this issue Jun 5, 2023
lispyclouds added a commit to bob-cd/bob-deploy that referenced this issue Jun 6, 2023
lispyclouds added a commit to bob-cd/bob-deploy that referenced this issue Jun 6, 2023
lispyclouds added a commit to bob-cd/bob-deploy that referenced this issue Jun 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: done
Development

No branches or pull requests

4 participants