Skip to content

Commit

Permalink
WIP: wake up the room loader worker thread immediately
Browse files Browse the repository at this point in the history
This noticeably reduces the delay before seeing messages when switching
to a new room.
  • Loading branch information
Benjamin-L committed Oct 30, 2023
1 parent 8943909 commit 0908bc4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
4 changes: 3 additions & 1 deletion src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,9 @@ impl ChatStore {

/// Mark a room for loading more scrollback.
pub fn mark_for_load(&mut self, room_id: OwnedRoomId) {
self.need_load.insert(room_id);
if self.need_load.insert(room_id) {
self.worker.notify_need_load();
}
}

/// Get the [RoomInfo] for a given room identifier.
Expand Down
31 changes: 21 additions & 10 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::{Duration, Instant};
use futures::{stream::FuturesUnordered, StreamExt};
use gethostname::gethostname;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{error, warn};

Expand Down Expand Up @@ -192,11 +193,11 @@ async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<Str
let ChatStore { need_load, rooms, .. } = &mut locked.application;
let mut plan = HashMap::new();

for room_id in std::mem::take(need_load).into_iter() {
for room_id in need_load.iter() {
let info = rooms.get_or_default(room_id.clone());

// TODO: not sure I understand this bit?
if info.recently_fetched() || info.fetching {
need_load.insert(room_id);
continue;
} else {
info.fetch_last = Instant::now().into();
Expand All @@ -209,7 +210,7 @@ async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<Str
RoomFetchStatus::NotStarted => None,
};

plan.insert(room_id, fetch_id);
plan.insert(room_id.clone(), fetch_id);
}

return plan;
Expand Down Expand Up @@ -280,12 +281,10 @@ async fn load_insert(
}

info.fetch_id = fetch_id.map_or(RoomFetchStatus::Done, RoomFetchStatus::HaveMore);
need_load.remove(&room_id);
},
Err(e) => {
warn!(room_id = room_id.as_str(), err = e.to_string(), "Failed to load older messages");

// Wait and try again.
need_load.insert(room_id);
},
}
}
Expand All @@ -311,13 +310,15 @@ async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize {
.await
}

async fn load_older_forever(client: &Client, store: &AsyncProgramStore) {
async fn load_older_forever(notify_need_load: &Notify, client: &Client, store: &AsyncProgramStore) {
// Load older messages every 2 seconds.
let mut interval = tokio::time::interval(Duration::from_secs(2));
// TODO: maybe change MissedTickBehavior

loop {
interval.tick().await;
notify_need_load.notified().await;
load_older(client, store).await;
// interval.tick().await;
}
}

Expand Down Expand Up @@ -531,6 +532,7 @@ impl Debug for WorkerTask {
pub struct Requester {
pub client: Client,
pub tx: UnboundedSender<WorkerTask>,
notify_need_load: Arc<Notify>,
}

impl Requester {
Expand Down Expand Up @@ -617,6 +619,11 @@ impl Requester {

return response.recv();
}

/// TODO: doc
pub fn notify_need_load(&self) {
self.notify_need_load.notify_waiters();
}
}

pub struct ClientWorker {
Expand All @@ -625,11 +632,13 @@ pub struct ClientWorker {
client: Client,
load_handle: Option<JoinHandle<()>>,
sync_handle: Option<JoinHandle<()>>,
notify_need_load: Arc<Notify>,
}

impl ClientWorker {
pub async fn spawn(settings: ApplicationSettings) -> Requester {
let (tx, rx) = unbounded_channel();
let notify_need_load = Arc::new(Notify::new());
let account = &settings.profile;

let req_timeout = Duration::from_secs(settings.tunables.request_timeout);
Expand Down Expand Up @@ -663,13 +672,14 @@ impl ClientWorker {
client: client.clone(),
load_handle: None,
sync_handle: None,
notify_need_load: Arc::clone(&notify_need_load),
};

tokio::spawn(async move {
worker.work(rx).await;
});

return Requester { client, tx };
return Requester { client, tx, notify_need_load };
}

async fn work(&mut self, mut rx: UnboundedReceiver<WorkerTask>) {
Expand Down Expand Up @@ -1039,9 +1049,10 @@ impl ClientWorker {

self.load_handle = tokio::spawn({
let client = self.client.clone();
let notify_need_load = Arc::clone(&self.notify_need_load);

async move {
let load = load_older_forever(&client, &store);
let load = load_older_forever(&notify_need_load, &client, &store);
let rcpt = send_receipts_forever(&client, &store);
let room = refresh_rooms_forever(&client, &store);
let ((), (), ()) = tokio::join!(load, rcpt, room);
Expand Down

0 comments on commit 0908bc4

Please sign in to comment.