-
-
Notifications
You must be signed in to change notification settings - Fork 884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Federation throughput in a synchronous manner is limited to network distance #4529
Comments
Just collecting some further points from discussions on Matrix:
|
Here are some potential solutions.
|
Solution through brainstorming on matrix to parallelize sending requests
We end up with the following sending queues Queue A ids: 1,2,3,6,7 Now each queue parallel to and independent from other other queues, follows the existing logic to send its post to the target instance. So queue A will send 1 then 2 then 3 etc. If any fails, the queue aborts anything subsequent. So if queue A failed to send ID 3, it will abort, so that 3,6 and 7 will remain in the overarching queue to send. However the failure of 3, will not stop 4,8 and 5 from going through. Now on the next iteration of sending, 1,2,4,8,5 are gone, so the next queues will pick up 3,6,7 to send, along with 7 other IDs and split them into individual queues. This would allow an instance to send 1-n requests in parallel without ever running the risk of sending them in the wrong chronological order. |
@db0 In particular we can assign activities to a specific queue by post_id: |
You mean so that you have different posts per queue? Sure that could work as well. I like the idea of using the exact post ID for easier troubleshooting potential myself. |
Do you have a source on that? As far as I'm aware it does keep them alive and thus for replication has one persistently open connection. Other than that, I'll restate what I wrote on Matrix: I think we can solve this in a fairly simple way without any robustness issues: Instead of sending one activity, waiting for the 200 response, then sending the next activity, we can instead send multiple activities (sequentially down the same HTTP connection) and only start waiting for a response when 8 are currently in transit. The only thing unclear to me is how this can be implemented. On tokio's side using a channel with .buffered(8) on the receiving end should work, but I'm not sure how to make reqwest send multiple requests while still ensuring they are in the same sequential connection. This way, the internet latency does not matter as long as the parallel number chosen (8) is > than the internet latency divided by the internal processing time. E.g. if ping is 320ms and internal processing is 10ms, then 32 requests would need to be in the queue at once to not slow things down. Note that this does not change anything about the internal latency of an instance, that would still be sequential per receiving instance. But it removes all the latency outside of the Rust process. |
Maybe we can use something like this to get notified when the entire activity is sent, and then send out the next one. However that would require changes inside the federation library and exposing it in the public api. It would be easier and probably sufficient in practice if we simply wait eg 100ms between requests. |
Is this HTTP Response Streaming ( For Also a good note is that HTTP/2 basically always does chunked encoding, so the above applies only to 1.1. |
That is exactly the problem with parallel requests in isolation and it's non-trivial to solve, which is why I suggested a more robust split-queue |
Why is it non-trivial to solve? if the library provides an option to do it it's trivial to solve. the only problem is that by default it apparently uses HTTP2-pipelining which is not in-order |
Does the library provide an option then? |
It's HTTP pipelining The docs you sent seem to be about responses, but here we really only care about request and request body |
This helps you send multiple requests, not ensure they're processed in order which is the non-trivial part |
they are guaranteed to be sent in order and received in order so processing them in order should not be hard |
famous last words :D I don't think the http pipelining ensure that the response you get will be 200 before sending the next request. only that there will be a response. How does it guarantee for example that a request with ID 1 won't take too long to process before ID 2 hits the target? |
on the receiving side the requests can still be processed fully sequentially, no need for parallelism. note this only solves the problem in this ticket and not the one you had with internal latency |
Very important from the article in wikipedia From the linked source If I'm not mistaken, the POSTS to inbox, are not idempotent. A connection loss while sending will cause issues. |
it does look like reqwest doesn't really have options for this, probably since http1.1 pipelining is mostly seen as superseeded by http2 multiplexing, which is better in most ways except that it doesn't guarantee any sequentiality. so probably we should use just normal http2 multiplexing (which should just work with I think we can semi-ignore the sequentiality problem (e.g. just add a 10ms delay between sends) though because: If an instance is catching up, and processing e.g. 100ms per activity, and we have a parallelity of 8, then while the first 8 events will be sent simultaneously and potentially processed out of order, after that each next request will wait for the OK response 8 requests earlier, which means it will have a average delay of 10ms after the previous activity. |
If you don't ensure for sequential you can end up with situations of the previous edits or votes overriding newer edits/votes if they are in the same pipeline. Likewise, a connection loss in the middle of the request will cause you issues which you can't resolve by resending, since you're not idempotent. |
Another alternative that's still simpler and less overhead than adding per-thread or per-community queues would be to add a field to each json activity or http request header with the sequential id and on the receiving side add (if the number is given) a priority queue to which events are added and processed immediately if the sequential id was incremented by 1 and with a max delay of 10s or so if it was incremented by > 1 |
Anything which extends the apub protocol, will not be handled by other software properly. |
it's a compatible change so it doesn't affect anything else |
What do you mean by "compatible change"? |
it just adds a header or json field that will be ignored if not understood. pseudo code:
|
That's my point. If you rely on something that can be ignored, then non-lemmy software (kbin, sublinks, mastodon, piefed etc) can experience significant issues, such as bad edits, missing content when issues occur |
but i have to say since completely ignoring activity sequence in a much much worse way than the thing above is how all of lemmy worked before 0.19 i don't think this minor sequencing issue is really going to cause trouble. on high-volume instances with a parallelity of like 10 it's very unlikely there's a conflicting event is <10 events next to another conflicting one because you don't edit a post 10 times per second, and on less active instance it's also not going to be an issue because the parallelity doesn't actually have to come into effect |
You can't know that though. We can't foresee all potential issues. but we know that this approach has potential pitfalls from simple connection problems. I don't see the point of refactoring to a method with includes known risks and does exactly what the designers of http pipelining tell you not to do. PS: I am known to hit the wrong vote and correct it within the same second. Somtimes even setting whole posts as stickies through fat fingers |
This is actually a problem in #4559, it immediately updates
We should definitely do this and use timestamps, because sending sequentially wont use all available server resources. So we need to be able to send in parallel. I dont think there is a general solution here but we need to handle problems individually for each activity type (eg checking the edit timestamp like you say).
I dont see any difference between these. Anyway it could be implemented by storing the community_id in SentActivity table, and then each worker being assigned tasks as
This is implemented in #4559. Also I moved metadata fetches to a background task in #4564. |
Here's an implementation in pseudocode of what i wrote above in point (9). It would be great to get comments (especially from @Nutomic since it's an alternative to their PR) before I implement it properly. To recap, the advantage of doing this on the sending side is that this way the queue is guaranteed to be persistent - if done on the receiving side we either need to add a second persistent queue to the database or have the danger of losing activities since the 200 response happens before anything reaches a storage medium. Backpressure comes from limiting the number of in-flight requests. /// For each community, we keep a queue of sends.
/// Sends happen in parallel for different communities, but in sequence for one community.
/// This is a map that tracks which community we are currently sending an activity for
/// and the dependent activities we can only send afterwards
struct PendingSendsMap {
// the first entry in the map value here is always the activity that is currently being sent
map: HashMap<community_id, VecDequeue<Activity>>;
// total count of activities in all map values so we don't use unbounded memory
total_count: i64;
}
impl PendingSendsMap {
/// Add an activity to the blocklist ONLY if there's already an activity send currently happening for that community
fn push_if_has(self, activity) -> bool {
if let Some(entry) = self.map.entry(activity.community_id) {
self.total_count += 1;
entry.push_back(activity);
true
} else { false }
}
fn pop_next(self, community_id) -> Option<Activity> {
if let Some(entry) = self.map.entry(community_id) {
let activity = entry.pop_front();
assert activity.is_some(); // empty lists can't happen
self.total_count -= 1;
if entry.len() == 0 { self.map.remove(entry); }
activity
} else { None }
}
/// how many activities we currently have in RAM
fn total_in_ram() -> i64 {
self.total_count
}
/// how many activities are currently being sent out (regardless of how many blocked ones are held in RAM)
fn total_in_flight() -> i64 {
self.map.len()
}
}
let blocked = PendingSendsMap::new();
fn sending_loop() {
loop {
while blocked.total_in_ram() >= 1000 || blocked.total_in_flight() >= 10 {
wait_for_one_send_to_complete().await;
}
let Some(activity) = get_next_activity() else {
// everything already sent, end loop.
break;
}
if !blocked.push_if_has(activity) {
spawn(send_task(activity))
}
}
}
fn send_task(activity) {
retry_loop { // as before
http_request(activity).await;
}
// success!
let act = blocked.get(activity).pop_front();
assert act == activity;
if let next = blocked.pop_front() {
spawn(send_task(next));
}
signal_send_complete(); // can be done with a mpsc channel sending `()`
// todo: keep track of last_successful_id
} Note that in real code i would not use community_id anywhere and instead using an abstract "queue id" which can for now be community_id but may later be updated to be coalesce(affected_post_id, affected_community_id) when appropriate for more parallelisation. |
The difference is only in what state is persisted and needs to be kept track of. I imagined (7) as having fully separate persistent queues per community or per post, which means the federation_queue_state table would grow from 1k entries to 100k+ entries, with 100+k tasks reading and writing to that table. 9 on the other hand keeps one persistent queue per instance but within each one creates tiny in-memory queues with a bounded total size (e.g. 100 in the above code). On crashes, the in-memory queue is lost and restarts sending from at most 100 events earlier. |
If we don't care about send order at all (as you do in your PR), this can be solved with a tokio Stream with .buffered(N). That way N futures from the stream happen in parallel but the results retrieved from the stream are still guaranteed to be in order. For my above implementation, I'm not sure yet how to get the lowest successful id cleanly. |
Im not entirely sure how your BlockListMap would work. Instead of that it would also be possible to change Also its probably a bad idea to specify the number of send workers per target instance. I think it makes more sense to have a pool of send workers so you can configure the total number of concurrent, outgoing requests (for all target instances). Then more workers can be assigned to a larger target instance when necessary. This could be implemented by having |
That alone doesn't work, because you don't just need to wait for the current in-flight activity to finish, you also need to wait for other not-yet-inflight activities that are also waiting to finish. So you need a Vec per inflight_requests. That's what the BlockListMap is! a map from community id to list of waiting requests for that community
Maybe but then how exactly do you assign the numbers? How would you prevent a huge instance from starving out smaller instances? |
Also remember that it's not really "workers", rather there's a single permanent task per instance, and then each individual HTTP request is one task. The concurrency is limited per instance, but for instances that are up to date nothing is actually running apart from the permanent task
It definitely can't be unbounded because then you'll just load millions of messages into memory again if an instance is out of date. |
Does it really make sense to have the per-community queue on top of the per-instance queue? Seems like that would be two kinds of parallelism on top of each other. So maybe it would make sense to replace the current InstanceWorker with a CommunityWorker that does essentially the same thing, per community (or per post). Though it would also require a way to cleanup inactive senders. |
That's what I meant with variant 7 vs 9: #4529 (comment) What you describe is not really possible because:
In addition, we do (probably) want to limit requests per-instance and not per-community, which is not possible if each community would be handled fully separately |
An alternative to the tiny in-memory queues per community this is another option I thought of:
But I revised it to the |
Honestly I don't think sending all activities sequentially is the solution. Even a single community can become so large that it becomes too much for a small, remote instance sequentially. Especially when there is another spam attack, or when the target instance needs to catch up after some downtime. We could repeat to split by post_id, but even that may not be enough. The idea of my pr is that things don't need to be in order during sending, but can be put in the correct order by the receiving side. This has unlimited scaling potential simply by increasing the amount of parallel workers, and is also much easier to implement. |
I kinda agree and it's what I thought too before I read your PR code and realized all those complexities and that it's not really simpler. You still need a solution for what happens during server restart or server crash so no activities are lost, and right now you have the same sequential processing per community due to the per-community queue and the issue you have otherwise with activities being processed out-of-order. To fix that you (I think) need the same data structure I made above (it could also be on the receiving side but that's not really any better). |
Maybe the way forward is to purely implement only the fully-parallel sending (limited to N=10 inflight per instance) and go through every activity type and make sure we can make them commutative (basically just skip activities that have been overwritten already (point 5+6 from my list above). |
I kinda disagree with this part though. We can easily move to post_id (community_id not needed except for non-post related actions) and it's very unlikely that we will ever need more. Even if we do, using coalesce(comment_thread_id, post_id, community_id) would still be a possible later change. |
Not true, my pr doesn't have any per-community queue. And handling restarts or crashes works just like before, using last_activity_id (still need to find a way to update that correctly). If an instance is crashed or restarted it will resend some of the same activities again, but those duplicates already get rejected by Lemmy when receiving so it's fine. And activities are put in the correct order on the receiving side. Only problem is now with configuration, having 5 workers per instance could mean 10 instances * 5 workers = 50 workers, or could be 1000 * 5 = 5000. So a shared worker pool for all instances would be better so that you can configure exactly 50 workers to be active at any time. Anyway the pr implementation works and is better than what we have now.
I agree this is better than going by published timestamp. It doesn't require a complicated queue and works even if an activity is out of order by more than a second.
Sure it's possible, but I believe it would result in much more complex code than the above. And I don't see any benefit that would justify the extra complexity. |
Sorry, I misspoke, you need a per-something queue. In your case you added a per-instance queue on the receiving side, which means you solve the network latency problem (this issue, point 2 above) but not the internal latency issue (the one db0 had, point 3 above). If you wanted to solve 3 you'd have to split the receiving queue further while still keeping sequentiality somewhere.
I was talking about crashes or restarts on the receiving side. You respond with 200 before actually having processed anything which means the sender thinks everything has been sent and will never resend it, even if the receiver crashes with many activities only in memory, those are lost. That's why I don't think any of that receiving side things should be added.
My main point is that you already need that in your PR if you want to solve all the problems we have, since you don't remove any sequentiality as opposed to 0.19 at all just network latency. Whether on sending or receiving side. |
Right I also think that the receiving queue is not such a good idea, and better to change individual activity handlers to check the timestamps, so that older edits or votes are ignored. Then the sending side should work in parallel without problems. |
* Generate post thumbnail/metadata in background (ref #4529) * fix api test * Apply suggestions from code review Co-authored-by: SleeplessOne1917 <28871516+SleeplessOne1917@users.noreply.github.com> * fix test --------- Co-authored-by: SleeplessOne1917 <28871516+SleeplessOne1917@users.noreply.github.com>
Thinking about this more, I dont think its possible to handle all activities correctly when they are received in the wrong order. Its fine for post or comment edits as we have can read the existing post and compare timestamps. But imagine you upvote a post and then immediately undo the vote. If another instance receives them in the wrong order, it would be |
Right now maybe, but those could be changed right? For example, currently The same goes for post remove + restore. A post update can't use the published field but it can use the I would consider these part of what I meant above with making them commutative. It doesn't seem too complex to me, though it would probably require thinking through every case individually in detail. It seems like it should be possible to get through all or most by just comparing timestamps. |
Youre right that should work. |
Two questions (hopefully not dumb):
|
Fixed in #4623 |
Requirements
Summary
Problem: Activities are sequential but requires external data to be validated/queried that doesn't come with the request.
Server B -> A, says here is an activity. In that request can be a like/comment/new post. An example of a new post would mean that Server A, to show the post metadata (such as subtitle, or image) queries the new post.
Every one of these outbound requests that the receiving server does are:
Actual Problem
So every activity that results in a remote fetch delays activities. If the total activities that results in more than 1 per 0.6s, servers physically cannot and will never be able to catch up.
As such our decentralised solution to a problem requires a low-latency solution. Without intervention this will evidently ensure that every server will need to exist in only one region. EU or NA or APAC (etc.) (or nothing will exist in APAC, and it will make me sad)
To combat this solution we need to streamline activities and how lemmy handles them.
Steps to Reproduce
Technical Details
Trace 1:
Lemmy has to verify a user (is valid?). So it connects to a their server for information. AU -> X (0.6) + time for server to respond = 2.28s but that is all that happened.
Trace 2:
Similar to the previous trace, but after it verfied the user, it then had to do another
from_json
request to the instance itself. (No caching here?) As you can see 0.74 ends up being the server on the other end responding in a super fast fashion (0.14s) but the handshake + travel time eats up the rest.Trace 3:
Fetching external content. I've seen external servers take upwards of 10 seconds to report data, especially because whenever a fediverse link is shared, every server refreshes it's own data. As such you basically create a mini-dos when you post something.
Trace 4:
Sometimes a lemmy server takes a while to respond for comments.
Version
0.19.3
Lemmy Instance URL
No response
The text was updated successfully, but these errors were encountered: