Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change threading model for subscribers #101

Merged
merged 8 commits into from
Mar 30, 2023
Merged

Conversation

diegodiv
Copy link
Contributor

The previous model for subscribers was monolithic: one thread was managing all the subscribers, and if one of them failed to answer, it could block the entire subscribers mechanism. We want to avoid this "single point of failure" kind of architecture, so we opted for a different thread model.

Now, for each subscriber, we have a dedicated thread that handles sending the updates to them. This thread is killed on timeout or unsubscribing. Icepeak's main thread feeds these threads through dedicated bounded queues (one per subscriber).

This PR starts as a draft because I want #100 merged first.

@diegodiv diegodiv requested review from fatho and xrvdg March 23, 2023 16:36
@diegodiv diegodiv removed the request for review from xrvdg March 24, 2023 08:08
server/src/Core.hs Outdated Show resolved Hide resolved
@diegodiv diegodiv marked this pull request as ready for review March 24, 2023 14:56
Copy link
Member

@fatho fatho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good approach, just some polishing remarks below.

Also, I think there were more potential solution ideas floating around, did you collect them somewhere and wrote down their pros and cons? Would be good to document how we ended up with this new design.

server/src/Subscription.hs Show resolved Hide resolved
server/src/WebsocketServer.hs Outdated Show resolved Hide resolved
writeToSubQueue :: TBQueue Value -> Value -> IO ()
writeToSubQueue queue val = atomically $ do
isFull <- isFullTBQueue queue
unless isFull $ writeTBQueue queue val
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a production ready version, it would be interesting to count how many updates we lose due to individual queues being full (so just having a counter metric here for the isFull case).
That allows us to measure whether the chosen queue length is right (just a few lost events occasionally) or too small (many events lost).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be good to log a message every time we discard a message, due to the queue being full.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be good to log a message every time we discard a message, due to the queue being full.

What extra information would a log give us in this case that we wouldn't get from a counter metric? I would expect such a log to be rather spammy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What extra information would a log give us in this case that we wouldn't get from a counter metric?

I would log the actual message that is being discarded (which you wouldn't get from the counter). I think this is useful for debugging, since you would get a direct log message and otherwise you would need to know that IP discards messages sometimes and would need to check the counter metric.

I would expect such a log to be rather spammy.

I think it shouldn't be too bad, since we should just close the TCP connection to any customer that is timing out (@DiegoDiverio is that what happens now btw? Is the timeout value configurable?). So you would only get some log spam until the connection is closed.

Copy link
Contributor Author

@diegodiv diegodiv Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what happens now (the timeout), but the value is a constant in the code (30s). We can improve on that, making it a configuration option.

I don't know the frequency of updates, but if the updates were to occur frequently, you would end up with a lot of spam each time a client timeouts or takes time to answer. However, it doesn't happen often that a client exhibits such behaviour.

Though I would advise against getting this PR too large: we should solve a problem at a time to keep the PR readable, especially on sensitive matters like threading-related improvements. After the relevant PRs are merged, we can then bump a new version for Icepeak. I'll make a comment below on what I think should be included in that PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would log the actual message that is being discarded (which you wouldn't get from the counter). I think this is useful for debugging, since you would get a direct log message and otherwise you would need to know that IP discards messages sometimes and would need to check the counter metric.

I don't think that would help with debugging, as a client being too slow to receive data (or just a connection that was silently dropped) is not a problem we can reasonably take action on.


Adding a metric does have value though, because it allows us to actually observe that things are working correctly. (Logs would to of course, but a metric is both easier to observe and has lower overhead). That's why I would also disagree with

Though I would advise against getting this PR too large: we should solve a problem at a time to keep the PR readable

The change in this PR is pretty big in terms of how icepeak operates, and leaves us with no way of observing that it is working correctly, and not losing half the updates or something. (We would probably eventually notice if it affects what our users see - but that's too late)

A simple counter metric would be a low overhead way of achieving this: as long as the metric is zero (or rising very slowly) it means that the updates are still working as intended.

If you want to keep the PR small, you can create a second PR to be based on this one, then merge them together before deploying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense. Let's go with the counter metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's exactly the dev pattern I was looking for. I won't deploy before having merged all the developments we talked about here.

Copy link
Contributor

@ReinierMaas ReinierMaas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an open question about the design:

Should we drop the message that we try to insert?

Another option is to drop the next message that we are about to send.


A simple representation to get the idea across:

queue (max 3): m3->m2->m1
update: m4
choice drop m1 or drop m4:
* queue (drop m1): m4->m3->m2
* queue (drop m4): m3->m2->m1

Pros:

We will eventually get the client to the last known state

Cons:

We will skip intermediate states from the client's point of view

@diegodiv
Copy link
Contributor Author

Another option is to drop the next message that we are about to send.

This is a very good idea, I think. Because if you want to have something like a progressive evolution while ensuring you don't lose any of the information, I don't think you should use a KV store like Icepeak, since it doesn't provide this kind of certainty.

But, conversely, if you're only interested in the current/last value for a given key, a KV store is a good fit.

@diegodiv
Copy link
Contributor Author

diegodiv commented Mar 28, 2023

A summary of the improvements to be included in this PR:

  • Discarding the oldest element instead of the newest one. By the way, in the case the queue gets full, why not just empty the queue, since the intermediate values will be out-of-date? What is the use case for sending them all to the subscriber? We might just even need to have MVar (or another appropriate mutable state) instead of queues, and just register if the value has been updated.
  • Add configuration options to modify the length of the subscriber queues and the duration of the timeout.

Not included in this PR:

  • Change import and mtl version to use CPS Writer monad.
  • Add logs and metrics to have information on Icepeak's behaviour regarding subscribers.

@fatho
Copy link
Member

fatho commented Mar 28, 2023

By the way, in the case the queue gets full, why not just empty the queue, since the intermediate values will be out-of-date? What is the use case for sending them all to the subscriber? We might just even need to have MVar (or another appropriate mutable state) instead of queues, and just register if the value has been updated.

Makes sense, a subscription always gets the full values for the path it registered, right? Then indeed only the most recent change would matter for the existing uses.

@diegodiv diegodiv self-assigned this Mar 28, 2023
@ReinierMaas
Copy link
Contributor

Makes sense, a subscription always gets the full values for the path it registered, right?

Yes, this is my understanding as well. Nothing about the final state gets lost when intermediate states are skipped.

We might just even need to have MVar (or another appropriate mutable state) instead of queues, and just register if the value has been updated.

The more general way to think about this is: Using an MVar is equal to setting the Queue's size to 1.

What is the use case for sending them all to the subscriber?

The question here is whether we want the intermediate states displayed to the user. The feeling of progress might otherwise be missing because intermediate states are skipped.
(I don't think this is an actual problem more a theoretical one.)

@diegodiv
Copy link
Contributor Author

diegodiv commented Mar 28, 2023

The more general way to think about this is: Using an MVar is equal to setting the Queue's size to 1.

This is true, but we should use an explicit structure that conveys our intent, whichever one it is. I think MVar might convey this better, but a 1-element queue is fine as well, as long as we structure the code so that the intention is clear.

What is the use case for sending them all to the subscriber?

The question here is whether we want the intermediate states displayed to the user. The feeling of progress might otherwise be missing because intermediate states are skipped. (I don't think this is an actual problem more a theoretical one.)

I think that most of the time, we'll end up actually sending these updates. The subscriber thread should wake up each time the Mvar/TBQueue contains an element because of the blocking read. There is always the possibility that some of the time it skips an update if it's in the middle of other successive updates.

@rkrzr
Copy link
Contributor

rkrzr commented Mar 28, 2023

This is true, but we should use an explicit structure that conveys our intent, whichever one it is. I think MVar might convey this better, but a 1-element queue is fine as well, as long as we structure the code so that the intention is clear.

Good idea. For all of our current use cases an MVar would work just fine.

(I was first thinking that there might be other cases where intermediate states are important, say, a state machine, but since that's not a current requirement, let's not optimize for that)

Copy link
Member

@fatho fatho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MVar based queue looks good, just a remark about the "timeout":

connection <- WS.acceptRequest pending
-- Fork a pinging thread, for each client, to keep idle connections open and to detect
-- closed connections. Sends a ping message every 30 seconds.
-- Note: The thread dies silently if the connection crashes or is closed.
WS.withPingThread connection 30 (pure ()) $ handleClient connection path core
WS.withPingThread connection timeout (pure ()) $ handleClient connection path core
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above comment (and subsequently looking into the implementation) makes me think that the 30 isn't actually a timeout. It seems to only control how often the server sends a keep-alive ping to prevent intermediate proxies from closing the connection.

In our case, such a (reverse) proxy is nginx which we use for the public-facing side of icepeak to terminate HTTPS connections. it seems that the actual timing out part is then happening when nginx hasn't heard from the client in a while and then closes the connection. That connection close is then received by icepeak, which then in turn makes the send operation in the ping thread, and in the update thread, and the receive operation in keepTalking fail.
(NB: Since for us, nginx and icepeak run on the same host at the moment, Nginx closing the connection will always be observed as there's no packet loss on a localhost connection - or shouldn't be at least). If those were run on different servers, the packet signalling that the connection was closed might actually get lost, resulting in a truly stuck connection until the kernel decides the connection is dead - which AFAIK is a way longer timeout).

It's still a good idea to make this configurable since other setups might need other ping intervals, but it's not doing what we intend to do. Adding proper timeouts within icepeak is a different issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! I wasn't sure about what closed the connection. I'll create an issue to include timeout in Icepeak itself. Do you think that asking for a Pong using ServerOptions.serverRequirePong could fit the bill?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A pong should always be sent upon receiving a ping, according to the websocket protocol. So that option seems indeed like what we would need: if a pong wasn't received for the specified interval, the connection is closed.

server/src/Config.hs Outdated Show resolved Hide resolved
@diegodiv diegodiv requested a review from fatho March 30, 2023 08:38
Copy link
Member

@fatho fatho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit regarding the renamed option:

server/src/Config.hs Outdated Show resolved Hide resolved
Copy link
Member

@fatho fatho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@diegodiv
Copy link
Contributor Author

@OpsBotPrime merge

@OpsBotPrime
Copy link
Contributor

Pull request approved for merge by @DiegoDiverio, rebasing now.

Diego Diverio and others added 2 commits March 30, 2023 10:59
@OpsBotPrime
Copy link
Contributor

Rebased as 13b214b, waiting for CI …

@OpsBotPrime
Copy link
Contributor

CI job 🟡 started.

@OpsBotPrime OpsBotPrime merged commit 13b214b into master Mar 30, 2023
@OpsBotPrime OpsBotPrime deleted the dd/subscribers-threading branch March 30, 2023 09:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants