Skip to content

Commit

Permalink
Avoid loops when forwarding messages from Matrix to Rocket.Chat
Browse files Browse the repository at this point in the history
  • Loading branch information
exul committed Mar 19, 2017
1 parent 259d9bd commit 1f62cc8
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 34 deletions.
14 changes: 14 additions & 0 deletions src/matrix-rocketchat/db/user.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::{SystemTime, UNIX_EPOCH};

use diesel;
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
Expand Down Expand Up @@ -70,4 +72,16 @@ impl User {
.chain_err(|| ErrorKind::DBSelectError)?;
Ok(users.into_iter().next())
}

/// Update last message sent.
pub fn set_last_message_sent(&self, connection: &SqliteConnection) -> Result<()> {
let last_message_sent = SystemTime::now()
.duration_since(UNIX_EPOCH)
.chain_err(|| ErrorKind::InternalServerError)?
.as_secs() as i64;
diesel::update(users::table.find(&self.matrix_user_id)).set(users::last_message_sent.eq(last_message_sent))
.execute(connection)
.chain_err(|| ErrorKind::DBUpdateError)?;
Ok(())
}
}
5 changes: 5 additions & 0 deletions src/matrix-rocketchat/handlers/events/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ impl<'a> Forwarder<'a> {
}
_ => info!(self.logger, format!("Forwarding the type {} is not implemented.", event.event_type)),
}

user_on_rocketchat_server.user(self.connection)?
.set_last_message_sent(self.connection)?;
}
None => debug!(self.logger, "Skipping event, because the room is not bridged"),
}

debug!(self.logger, "Successfully forwarded message to Rocket.Chat server");
Ok(())
}

Expand Down
29 changes: 29 additions & 0 deletions src/matrix-rocketchat/handlers/rocketchat/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::convert::TryFrom;
use std::time::{SystemTime, UNIX_EPOCH};

use diesel::Connection;
use diesel::sqlite::SqliteConnection;
Expand All @@ -12,6 +13,8 @@ use db::{NewUser, NewUserInRoom, NewUserOnRocketchatServer, RocketchatServer, Ro
use errors::*;
use i18n::DEFAULT_LANGUAGE;

const RESEND_THRESHOLD_IN_SECONDS: i64 = 3;

/// Forwards messages from Rocket.Chat to Matrix
pub struct Forwarder<'a> {
/// Application service configuration
Expand Down Expand Up @@ -39,6 +42,12 @@ impl<'a> Forwarder<'a> {
}
};

if !self.is_sendable_message(&user_on_rocketchat_server)? {
debug!(self.logger,
"Skipping message, because the message was just posted by the user Matrix and echoed back from Rocket.Chat");
return Ok(());
}

let room =
match Room::find_by_rocketchat_room_id(self.connection, rocketchat_server.id, message.channel_id.clone())? {
Some(room) => room,
Expand Down Expand Up @@ -122,4 +131,24 @@ impl<'a> Forwarder<'a> {
UserInRoom::insert(self.connection, &new_user_in_room)?;
Ok(())
}

fn is_sendable_message(&self, virtual_user_on_rocketchat_server: &UserOnRocketchatServer) -> Result<bool> {
match UserOnRocketchatServer::find_by_rocketchat_user_id(self.connection,
virtual_user_on_rocketchat_server.rocketchat_server_id,
virtual_user_on_rocketchat_server.rocketchat_user_id
.clone()
.unwrap_or_default(),
false)? {
Some(user_on_rocketchat_server) => {
let user = user_on_rocketchat_server.user(self.connection)?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.chain_err(|| ErrorKind::InternalServerError)?
.as_secs() as i64;
let last_sent = now - user.last_message_sent;
Ok(last_sent < RESEND_THRESHOLD_IN_SECONDS)
}
None => Ok(true),
}
}
}
55 changes: 21 additions & 34 deletions tests/forward_rocketchat_to_matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ fn successfully_forwards_a_text_message_from_rocketchat_to_matrix_when_the_user_
matrix_router.put(SendMessageEventEndpoint::router_path(), message_forwarder, "send_message_event");
matrix_router.post(InviteUserEndpoint::router_path(), invite_forwarder, "invite_user");
matrix_router.post(JoinRoomByIdEndpoint::router_path(), join_forwarder, "join_room_id");
matrix_router.put(SetDisplayNameEndpoint::router_path(),
set_display_name_forwarder,
"set_display_name");
matrix_router.put(SetDisplayNameEndpoint::router_path(), set_display_name_forwarder, "set_display_name");

let mut channels = HashMap::new();
channels.insert("spec_channel", vec!["spec_user"]);
Expand Down Expand Up @@ -145,13 +143,8 @@ fn successfully_forwards_a_text_message_from_rocketchat_to_matrix_when_the_user_
matrix_router.put(SendMessageEventEndpoint::router_path(), message_forwarder, "send_message_event");
matrix_router.post(InviteUserEndpoint::router_path(), invite_forwarder, "invite_user");
matrix_router.post(JoinRoomByIdEndpoint::router_path(), join_forwarder, "join_room_id");
matrix_router.put(SetDisplayNameEndpoint::router_path(),
set_display_name_forwarder,
"set_display_name");

matrix_router.put(SetDisplayNameEndpoint::router_path(), set_display_name_forwarder, "set_display_name");

let mut channels = HashMap::new();
channels.insert("spec_channel", vec!["spec_user"]);

let test = Test::new()
.with_matrix_routes(matrix_router)
Expand All @@ -166,8 +159,8 @@ fn successfully_forwards_a_text_message_from_rocketchat_to_matrix_when_the_user_
token: Some(RS_TOKEN.to_string()),
channel_id: "spec_channel_id".to_string(),
channel_name: "spec_channel".to_string(),
user_id: "spec_user_id".to_string(),
user_name: "spec_user".to_string(),
user_id: "virtual_spec_user_id".to_string(),
user_name: "virtual_spec_user".to_string(),
text: "spec_message".to_string(),
};
let payload = to_string(&message).unwrap();
Expand All @@ -176,7 +169,7 @@ fn successfully_forwards_a_text_message_from_rocketchat_to_matrix_when_the_user_

// receive the invite message
let invite_message = invite_receiver.recv_timeout(default_timeout()).unwrap();
assert!(invite_message.contains("@rocketchat_spec_user_id_1:localhost"));
assert!(invite_message.contains("@rocketchat_virtual_spec_user_id_1:localhost"));

// discard admin room join
join_receiver.recv_timeout(default_timeout()).unwrap();
Expand Down Expand Up @@ -223,15 +216,15 @@ fn successfully_forwards_a_text_message_from_rocketchat_to_matrix_when_the_user_
// the virtual user was create with the Rocket.Chat user ID because the exiting matrix user
// cannot be used since the application service can only impersonate virtual users.
let user_on_rocketchat = UserOnRocketchatServer::find(&connection, new_user_id, rocketchat_server_id).unwrap();
assert_eq!(user_on_rocketchat.rocketchat_user_id.unwrap(), "spec_user_id".to_string());
assert_eq!(user_on_rocketchat.rocketchat_user_id.unwrap(), "virtual_spec_user_id".to_string());

let second_message = Message {
message_id: "spec_id_2".to_string(),
token: Some(RS_TOKEN.to_string()),
channel_id: "spec_channel_id".to_string(),
channel_name: "spec_channel".to_string(),
user_id: "spec_user_id".to_string(),
user_name: "spec_user".to_string(),
user_id: "virtual_spec_user_id".to_string(),
user_name: "virtual_spec_user".to_string(),
text: "spec_message 2".to_string(),
};
let second_payload = to_string(&second_message).unwrap();
Expand All @@ -249,9 +242,7 @@ fn successfully_forwards_a_text_message_from_rocketchat_to_matrix_when_the_user_
fn update_the_display_name_when_the_user_changed_it_on_the_rocketchat_server() {
let (set_display_name_forwarder, set_display_name_receiver) = MessageForwarder::new();
let mut matrix_router = Router::new();
matrix_router.put(SetDisplayNameEndpoint::router_path(),
set_display_name_forwarder,
"set_display_name");
matrix_router.put(SetDisplayNameEndpoint::router_path(), set_display_name_forwarder, "set_display_name");


let mut channels = HashMap::new();
Expand All @@ -270,8 +261,8 @@ fn update_the_display_name_when_the_user_changed_it_on_the_rocketchat_server() {
token: Some(RS_TOKEN.to_string()),
channel_id: "spec_channel_id".to_string(),
channel_name: "spec_channel".to_string(),
user_id: "spec_user_id".to_string(),
user_name: "spec_user".to_string(),
user_id: "virtual_spec_user_id".to_string(),
user_name: "virtual_spec_user".to_string(),
text: "spec_message".to_string(),
};
let payload = to_string(&message).unwrap();
Expand All @@ -286,8 +277,8 @@ fn update_the_display_name_when_the_user_changed_it_on_the_rocketchat_server() {
token: Some(RS_TOKEN.to_string()),
channel_id: "spec_channel_id".to_string(),
channel_name: "spec_channel".to_string(),
user_id: "spec_user_id".to_string(),
user_name: "spec_user_new".to_string(),
user_id: "virtual_spec_user_id".to_string(),
user_name: "virtual_spec_user_new".to_string(),
text: "spec_message 2".to_string(),
};
let second_payload_with_new_username = to_string(&second_message_with_new_username).unwrap();
Expand All @@ -302,12 +293,11 @@ fn update_the_display_name_when_the_user_changed_it_on_the_rocketchat_server() {
let rocketchat_server_id = admin_room.rocketchat_server_id.unwrap();
let user_on_rocketchat_server = UserOnRocketchatServer::find_by_rocketchat_user_id(&connection,
rocketchat_server_id,
"spec_user_id".to_string(),
"virtual_spec_user_id".to_string(),
true)
.unwrap()
.unwrap();
assert_eq!(user_on_rocketchat_server.rocketchat_username.unwrap(),
"spec_user_new".to_string());
assert_eq!(user_on_rocketchat_server.rocketchat_username.unwrap(), "virtual_spec_user_new".to_string());
}

#[test]
Expand All @@ -322,9 +312,6 @@ fn message_is_forwarded_even_if_setting_the_display_name_failes() {
},
"set_display_name");

let mut channels = HashMap::new();
channels.insert("spec_channel", vec!["spec_user"]);

let test = Test::new()
.with_matrix_routes(matrix_router)
.with_rocketchat_mock()
Expand All @@ -338,8 +325,8 @@ fn message_is_forwarded_even_if_setting_the_display_name_failes() {
token: Some(RS_TOKEN.to_string()),
channel_id: "spec_channel_id".to_string(),
channel_name: "spec_channel".to_string(),
user_id: "spec_user_id".to_string(),
user_name: "spec_user".to_string(),
user_id: "virtual_spec_user_id".to_string(),
user_name: "virtual_spec_user".to_string(),
text: "spec_message".to_string(),
};
let payload = to_string(&message).unwrap();
Expand Down Expand Up @@ -468,8 +455,8 @@ fn returns_unauthorized_when_the_rs_token_is_missing() {
token: None,
channel_id: "spec_channel_id".to_string(),
channel_name: "spec_channel".to_string(),
user_id: "spec_user_id".to_string(),
user_name: "spec_user".to_string(),
user_id: "virtual_spec_user_id".to_string(),
user_name: "virtual_spec_user".to_string(),
text: "spec_message".to_string(),
};
let payload = to_string(&message).unwrap();
Expand All @@ -490,8 +477,8 @@ fn returns_forbidden_when_the_rs_token_does_not_match_a_server() {
token: Some("wrong_token".to_string()),
channel_id: "spec_channel_id".to_string(),
channel_name: "spec_channel".to_string(),
user_id: "spec_user_id".to_string(),
user_name: "spec_user".to_string(),
user_id: "virtual_spec_user_id".to_string(),
user_name: "virtual_spec_user".to_string(),
text: "spec_message".to_string(),
};
let payload = to_string(&message).unwrap();
Expand Down

0 comments on commit 1f62cc8

Please sign in to comment.