New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of mob server #23

Merged
merged 2 commits into from Mar 22, 2018
File filter...
Filter file types
Jump to file or symbol
Failed to load files and symbols.
+59 −113
Diff settings

Always

Just for now

Next

Remove need for connection state and tick per loop

At the end of each event loop tick the server would do an O(n) scan of
all connections to check their state and remove them from the slab if
they were reset. Now, the connection no longer has to track connection
state and the connection is removed when there is an error.
  • Loading branch information...
hjr3 committed Mar 9, 2018
commit 485487217ddde7d316d7c7b0ac9057696278bc43
Copy path View file
@@ -23,14 +23,9 @@ pub struct Connection {
interest: Ready,

// messages waiting to be sent out
// TODO: should use VecDequeue?
send_queue: Vec<Rc<Vec<u8>>>,

// track whether a connection needs to be (re)registered
is_idle: bool,

// track whether a connection is reset
is_reset: bool,

// track whether a read received `WouldBlock` and store the number of
// byte we are supposed to read
read_continuation: Option<u64>,
@@ -47,8 +42,6 @@ impl Connection {
token: token,
interest: Ready::from(UnixReady::hup()),
send_queue: Vec::new(),
is_idle: true,
is_reset: false,
read_continuation: None,
write_continuation: false,
}
@@ -62,7 +55,7 @@ impl Connection {
/// listening connections.
pub fn readable(&mut self) -> io::Result<Option<Vec<u8>>> {

let msg_len = match try!(self.read_message_length()) {
let msg_len = match self.read_message_length()? {
None => { return Ok(None); },
Some(n) => n,
};
@@ -75,9 +68,9 @@ impl Connection {
let msg_len = msg_len as usize;

debug!("Expected message length is {}", msg_len);

// Here we allocate and set the length with unsafe code. The risks of this are discussed
// at https://stackoverflow.com/a/30979689/329496 and are mitigated as recv_buf is
// at https://stackoverflow.com/a/30979689/329496 and are mitigated as recv_buf is
// abandoned below if we don't read msg_leg bytes from the socket
let mut recv_buf : Vec<u8> = Vec::with_capacity(msg_len);
unsafe { recv_buf.set_len(msg_len); }
@@ -89,6 +82,7 @@ impl Connection {
Ok(n) => {
debug!("CONN : we read {} bytes", n);

// TODO handle a read continuation here
if n < msg_len as usize {
return Err(Error::new(ErrorKind::InvalidData, "Did not read enough bytes"));
}
@@ -150,7 +144,7 @@ impl Connection {
/// flush until the kernel sends back EAGAIN?
pub fn writable(&mut self) -> io::Result<()> {

try!(self.send_queue.pop()
self.send_queue.pop()
.ok_or(Error::new(ErrorKind::Other, "Could not pop send queue"))
.and_then(|buf| {
match self.write_message_length(&buf) {
@@ -197,8 +191,7 @@ impl Connection {
}
}
}
})
);
})?;

if self.send_queue.is_empty() {
self.interest.remove(Ready::writable());
@@ -246,16 +239,14 @@ impl Connection {
/// This will cause the connection to register interests in write events with the poller.
/// The connection can still safely have an interest in read events. The read and write buffers
/// operate independently of each other.
pub fn send_message(&mut self, message: Rc<Vec<u8>>) -> io::Result<()> {
pub fn send_message(&mut self, message: Rc<Vec<u8>>) {
trace!("connection send_message; token={:?}", self.token);

self.send_queue.push(message);

if !self.interest.is_writable() {
self.interest.insert(Ready::writable());
}

Ok(())
}

/// Register interest in read events with poll.
@@ -271,10 +262,7 @@ impl Connection {
self.token,
self.interest,
PollOpt::edge() | PollOpt::oneshot()
).and_then(|(),| {
self.is_idle = false;
Ok(())
}).or_else(|e| {
).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
@@ -289,34 +277,9 @@ impl Connection {
self.token,
self.interest,
PollOpt::edge() | PollOpt::oneshot()
).and_then(|(),| {
self.is_idle = false;
Ok(())
}).or_else(|e| {
).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
}

pub fn mark_reset(&mut self) {
trace!("connection mark_reset; token={:?}", self.token);

self.is_reset = true;
}

#[inline]
pub fn is_reset(&self) -> bool {
self.is_reset
}

pub fn mark_idle(&mut self) {
trace!("connection mark_idle; token={:?}", self.token);

self.is_idle = true;
}

#[inline]
pub fn is_idle(&self) -> bool {
self.is_idle
}
}
Copy path View file
@@ -15,7 +15,7 @@ pub struct Server {
// main socket for our server
sock: TcpListener,

// token of our server. we keep track of it here instead of doing `const SERVER = Token(0)`.
// token of our server. we keep track of it here instead of doing `const SERVER = Token(_)`.
token: Token,

// a list of connections _accepted_ by our server
@@ -34,8 +34,7 @@ impl Server {
// track an internal offset, but does not anymore.
token: Token(10_000_000),

// SERVER is Token(1), so start after that
// we can deal with a max of 126 connections
// We will handle a max of 128 connections
conns: Slab::with_capacity(128),

// list of events from the poller that the server needs to process
@@ -45,11 +44,11 @@ impl Server {

pub fn run(&mut self, poll: &mut Poll) -> io::Result<()> {

try!(self.register(poll));
self.register(poll)?;

info!("Server run loop starting...");
loop {
let cnt = try!(poll.poll(&mut self.events, None));
let cnt = poll.poll(&mut self.events, None)?;

let mut i = 0;

@@ -70,8 +69,6 @@ impl Server {

i += 1;
}

self.tick(poll);
}
}

@@ -90,50 +87,37 @@ impl Server {
})
}

fn tick(&mut self, poll: &mut Poll) {
trace!("Handling end of tick");

let mut reset_tokens = Vec::new();

for c in self.conns.iter_mut() {
if c.is_reset() {
reset_tokens.push(c.token);
} else if c.is_idle() {
c.reregister(poll)
.unwrap_or_else(|e| {
warn!("Reregister failed {:?}", e);
c.mark_reset();
reset_tokens.push(c.token);
});
/// Remove a token from the slab
fn remove_token(&mut self, token: Token) {
match self.conns.remove(token) {
Some(_c) => {
debug!("reset connection; token={:?}", token);
}
}

for token in reset_tokens {
match self.conns.remove(token) {
Some(_c) => {
debug!("reset connection; token={:?}", token);
}
None => {
warn!("Unable to remove connection for {:?}", token);
}
None => {
warn!("Unable to remove connection for {:?}", token);
}
}
}

fn ready(&mut self, poll: &mut Poll, token: Token, event: Ready) {
debug!("{:?} event = {:?}", token, event);

if self.token != token && self.conns.contains(token) == false {
debug!("Failed to find connection for {:?}", token);
return;
}

let event = UnixReady::from(event);

if event.is_error() {
warn!("Error event for {:?}", token);
self.find_connection_by_token(token).mark_reset();
self.remove_token(token);
return;
}

if event.is_hup() {
trace!("Hup event for {:?}", token);
self.find_connection_by_token(token).mark_reset();
self.remove_token(token);
return;
}

@@ -145,18 +129,14 @@ impl Server {
trace!("Write event for {:?}", token);
assert!(self.token != token, "Received writable event for Server");

let conn = self.find_connection_by_token(token);

if conn.is_reset() {
info!("{:?} has already been reset", token);
return;
}

conn.writable()
.unwrap_or_else(|e| {
match self.connection(token).writable() {
Ok(()) => {},
Err(e) => {
warn!("Write event failed for {:?}, {:?}", token, e);
conn.mark_reset();
});
self.remove_token(token);
return;
}
}
}

// A read event for our `Server` token means we are establishing a new connection. A read
@@ -166,22 +146,26 @@ impl Server {
if self.token == token {
self.accept(poll);
} else {

if self.find_connection_by_token(token).is_reset() {
info!("{:?} has already been reset", token);
return;
}

self.readable(token)
.unwrap_or_else(|e| {
match self.readable(token) {
Ok(()) => {},
Err(e) => {
warn!("Read event failed for {:?}: {:?}", token, e);
self.find_connection_by_token(token).mark_reset();
});
self.remove_token(token);
return;
}
}
}
}

if self.token != token {
self.find_connection_by_token(token).mark_idle();
match self.connection(token).reregister(poll) {
Ok(()) => {},
Err(e) => {
warn!("Reregister failed {:?}", e);
self.remove_token(token);
return;
}
}
}
}

@@ -209,7 +193,6 @@ impl Server {

let token = match self.conns.vacant_entry() {
Some(entry) => {
debug!("registering {:?} with poller", entry.index());
let c = Connection::new(sock, entry.index());
entry.insert(c).index()
}
@@ -219,11 +202,12 @@ impl Server {
}
};

match self.find_connection_by_token(token).register(poll) {
debug!("registering {:?} with poller", token);
match self.connection(token).register(poll) {
Ok(_) => {},
Err(e) => {
error!("Failed to register {:?} connection with poller, {:?}", token, e);
self.conns.remove(token);
self.remove_token(token);
}
}
}
@@ -237,24 +221,23 @@ impl Server {
fn readable(&mut self, token: Token) -> io::Result<()> {
debug!("server conn readable; token={:?}", token);

while let Some(message) = try!(self.find_connection_by_token(token).readable()) {
while let Some(message) = self.connection(token).readable()? {

let rc_message = Rc::new(message);
// Queue up a write for all connected clients.
for c in self.conns.iter_mut() {
c.send_message(rc_message.clone())
.unwrap_or_else(|e| {
error!("Failed to queue message for {:?}: {:?}", c.token, e);
c.mark_reset();
});
c.send_message(rc_message.clone());
}
}

Ok(())
}

/// Find a connection in the slab using the given token.
fn find_connection_by_token(&mut self, token: Token) -> &mut Connection {
///
/// This function will panic if the token does not exist. Use self.conns.contains(token)
/// before using this function.
fn connection(&mut self, token: Token) -> &mut Connection {
&mut self.conns[token]
}
}
ProTip! Use n and p to navigate between commits in a pull request.