Skip to content

Commit

Permalink
Use simple caching for DM room lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
exul committed May 14, 2018
1 parent 5e1ece0 commit 8da007f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 26 deletions.
38 changes: 12 additions & 26 deletions src/matrix-rocketchat/handlers/rocketchat/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,39 +135,23 @@ impl<'a> Forwarder<'a> {
receiver: &UserOnRocketchatServer,
message: &WebhookMessage,
) -> Result<Option<Room>> {
if let Some(room) = self.lookup_existing_direct_message_room(server, receiver, message)? {
let sender_id = self.virtual_user.build_user_id(&message.user_id, &server.id)?;

if let Some(room) = Room::get(
self.config,
self.logger,
self.matrix_api,
message.channel_id.clone(),
sender_id,
receiver.matrix_user_id.clone(),
)? {
self.invite_user_into_direct_message_room(&room, receiver)?;
return Ok(Some(room));
}

self.auto_bridge_direct_message_channel(server, receiver, message)
}

fn lookup_existing_direct_message_room(
&self,
server: &RocketchatServer,
receiver: &UserOnRocketchatServer,
message: &WebhookMessage,
) -> Result<Option<Room>> {
let sender_id = self.virtual_user.build_user_id(&message.user_id, &server.id)?;

// If the user does not exist yet, there is no existing direct message room
if self.matrix_api.get_display_name(sender_id.clone())?.is_none() {
return Ok(None);
}

//TODO: This is highly inefficient and needs some kind of caching, but no persistent storage or alias is needed
for room_id in self.matrix_api.get_joined_rooms(sender_id.clone())? {
let room = Room::new(self.config, self.logger, self.matrix_api, room_id);
let user_ids = room.user_ids(Some(sender_id.clone()))?;
if user_ids.iter().all(|id| id == &sender_id || id == &receiver.matrix_user_id) {
return Ok(Some(room));
}
}

Ok(None)
}

fn invite_user_into_direct_message_room(&self, room: &Room, receiver: &UserOnRocketchatServer) -> Result<()> {
let direct_message_recepient = room.direct_message_matrix_user()?;
if direct_message_recepient.is_none() {
Expand Down Expand Up @@ -228,6 +212,8 @@ impl<'a> Forwarder<'a> {
debug!(self.logger, "Direct message room {} successfully created", &room_id);

let room = Room::new(self.config, self.logger, self.matrix_api, room_id);
room.add_to_cache(message.channel_id.clone(), receiver.matrix_user_id.clone());

Ok(Some(room))
} else {
debug!(
Expand Down
64 changes: 64 additions & 0 deletions src/matrix-rocketchat/models/room.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

Expand All @@ -16,6 +18,11 @@ use models::{RocketchatServer, UserOnRocketchatServer, VirtualUser};
/// The delay in milliseconds between two API requests (to not DOS the server)
pub const API_QUERY_DELAY: u64 = 500;

lazy_static! {
/// Direct room cache
static ref DM_ROOMS: Mutex<HashMap<(String, String), RoomId>> = { Mutex::new(HashMap::new()) };
}

/// A room that is managed by the application service. This can be either a bridged room or an
/// admin room.
pub struct Room<'a> {
Expand Down Expand Up @@ -55,6 +62,63 @@ impl<'a> Room<'a> {
Ok(room_id)
}

/// Get an existing direct message room.
pub fn get(
config: &'a Config,
logger: &'a Logger,
matrix_api: &'a MatrixApi,
channel_id: String,
sender_id: UserId,
receiver_id: UserId,
) -> Result<Option<Room<'a>>> {
// If the user does not exist yet, there is no existing direct message room
if matrix_api.get_display_name(sender_id.clone())?.is_none() {
return Ok(None);
}

match DM_ROOMS.lock() {
Ok(dm_rooms) => match dm_rooms.get(&(channel_id.clone(), receiver_id.to_string())) {
Some(room_id) => {
debug!(logger, "Found room {} for receiver {} in cache", channel_id, receiver_id);
let room = Room::new(config, logger, matrix_api, room_id.clone());
return Ok(Some(room));
}
None => {
debug!(logger, "Room {} for receiver {} not found in cache", channel_id, receiver_id);
}
},
Err(err) => {
warn!(logger, "Could lock DM cache to get room {} with receiver {}: {}", channel_id, receiver_id, err);
}
}

for room_id in matrix_api.get_joined_rooms(sender_id.clone())? {
let room = Room::new(config, logger, matrix_api, room_id);
let user_ids = room.user_ids(Some(sender_id.clone()))?;
if user_ids.iter().all(|id| id == &sender_id || id == &receiver_id) {
room.add_to_cache(channel_id, receiver_id);
return Ok(Some(room));
}
}

Ok(None)
}

/// Add a room to the cache.
/// This will speed-up future direct messages because the direct message room lookup is done via
/// cache instead of going through the users rooms.
pub fn add_to_cache(&self, channel_id: String, receiver_id: UserId) -> () {
match DM_ROOMS.lock() {
Ok(mut dm_rooms) => {
debug!(self.logger, "Adding DM room {} with receiver {} to cache", channel_id, receiver_id);
dm_rooms.insert((channel_id, receiver_id.to_string()), self.id.clone());
}
Err(err) => {
warn!(self.logger, "Could not add DM room {} with receiver {} to cache: {}", channel_id, receiver_id, err);
}
}
}

/// Bridges a room that is already bridged (for other users) for a new user.
pub fn bridge_for_user(&self, user_id: UserId, rocketchat_channel_name: String) -> Result<()> {
debug!(self.logger, "Briding existing room, Rocket.Chat channel: {}", rocketchat_channel_name);
Expand Down

0 comments on commit 8da007f

Please sign in to comment.