Skip to content

Commit

Permalink
Add scheduled thread pool
Browse files Browse the repository at this point in the history
Signed-off-by: trivernis <trivernis@protonmail.com>
  • Loading branch information
Trivernis committed Sep 12, 2020
1 parent 085ed6e commit 49d5458
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ crc = "1.8.1"
serde = "1.0.115"
byteorder = "1.3.4"
log = "0.4.11"
crossbeam-utils = "0.7.2"
crossbeam-utils = "0.7.2"
scheduled-thread-pool = "0.2.5"
num_cpus = "1.13.0"
16 changes: 12 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel};
use crossbeam_utils::sync::WaitGroup;
use std::mem;
use scheduled_thread_pool::ScheduledThreadPool;

const BUF_SIZE: usize = 512;

Expand Down Expand Up @@ -46,11 +47,18 @@ impl RpcServer {
/// Starts the RPC server
pub fn start(&mut self) -> io::Result<()> {
let listener = TcpListener::bind(&self.address)?;
let pool = ScheduledThreadPool::new(num_cpus::get());
for stream in listener.incoming() {
log::trace!("Connection received.");
match stream {
Ok(s) => if let Err(e) = self.handle_message(s) {
log::trace!("Error handling message {}", e.to_string())
Ok(s) => {
let sender = Sender::clone(&self.sender);
log::trace!("Scheduling message to be handled by thread pool");
pool.execute(|| {
if let Err(e) = Self::handle_message(sender, s) {
log::trace!("Error handling message {}", e.to_string())
}
});
},
Err(e) => log::trace!("TCP Error {}", e.to_string())
}
Expand All @@ -60,7 +68,7 @@ impl RpcServer {
}

/// Handles a message
fn handle_message(&mut self, mut incoming: TcpStream) -> io::Result<()> {
fn handle_message(sender: Sender<Arc<Mutex<MessageHandler>>>, mut incoming: TcpStream) -> io::Result<()> {
let mut length_raw = [0u8; 4];
incoming.read_exact(&mut length_raw)?;
let length = BigEndian::read_u32(&length_raw);
Expand All @@ -85,7 +93,7 @@ impl RpcServer {
wg: WaitGroup::clone(&wg),
response: None,
}));
self.sender.send(Arc::clone(&handler)).unwrap();
sender.send(Arc::clone(&handler)).unwrap();
wg.wait();
if let Some(response) = mem::replace(&mut handler.lock().unwrap().response, None) {
incoming.write(&response.to_bytes())?;
Expand Down

0 comments on commit 49d5458

Please sign in to comment.