From 2876ff5e710688b34f1c2f32d1db641f232f4754 Mon Sep 17 00:00:00 2001 From: Nicolas Le Manchet Date: Sun, 18 Oct 2015 23:30:20 +0200 Subject: [PATCH] Initial commit Working TCP load balancer synchronized with Redis to track changes to backend servers. --- .gitignore | 1 + Cargo.lock | 124 +++++++++++ Cargo.toml | 10 + LICENSE | 23 ++ README.md | 86 ++++++++ src/backend.rs | 105 +++++++++ src/main.rs | 74 +++++++ src/simplelogger.rs | 32 +++ src/sync.rs | 51 +++++ src/tcplb.rs | 525 ++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 1031 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 src/backend.rs create mode 100644 src/main.rs create mode 100644 src/simplelogger.rs create mode 100644 src/sync.rs create mode 100644 src/tcplb.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eb5a316 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..f3eaf6f --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,124 @@ +[root] +name = "nucleon" +version = "0.0.1" +dependencies = [ + "argparse 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "redis 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "argparse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bitflags" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "byteorder" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bytes" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "clock_ticks" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "libc" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "log" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "matches" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "mio" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "clock_ticks 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "nix 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "nix" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "redis" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rustc-serialize 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", + "sha1 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "url 0.2.37 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rustc-serialize" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "sha1" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "url" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..21b7f29 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "nucleon" +version = "0.0.1" +authors = ["Nicolas Le Manchet "] + +[dependencies] +log = "0.3.1" +argparse = "0.2.1" +mio = "0.4.3" +redis = "0.5.1" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..20ad057 --- /dev/null +++ b/LICENSE @@ -0,0 +1,23 @@ +Copyright (c) 2015, Nicolas Le Manchet +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..3d39aaa --- /dev/null +++ b/README.md @@ -0,0 +1,86 @@ +Nucleon +======= + +Nucleon is a dynamic TCP load balancer written in Rust. It has the ability to +insert and remove backend servers on the flight. To do that it leverages [Redis +Pub/Sub](http://redis.io/topics/pubsub) mechanism. Adding or removing a server +to a cluster is as easy as publishing a message to Redis. + +How to build it +--------------- + +All you need to build it is [Rust +1.3](https://doc.rust-lang.org/stable/book/installing-rust.html). + +Just go in the repository and issue: + + $ cargo build --release + +Usage +----- + +Nucleon can be used with or without a Redis database. When ran without Redis it +is not possible to add or remove load balanced servers without restarting the +process. + +``` +Usage: + nucleon [OPTIONS] [SERVER ...] + +Dynamic TCP load balancer + +positional arguments: + server Servers to load balance + +optional arguments: + -h,--help show this help message and exit + -b,--bind BIND Bind the load balancer to address:port (127.0.0.1:8000) + -r,--redis REDIS URL of Redis database (redis://localhost) + --no-redis Disable updates of backend through Redis + -l,--log LOG Log level [debug, info, warn, error] (info) +``` + +Imagine you have two web servers to load balance, and a local Redis. Run the +load balancer with: + + nucleon --bind 0.0.0.0:8000 10.0.0.1:80 10.0.0.2:80 + +Now imagine that you want to scale up your infrastructure by spawning a new web +server at 10.0.0.3. Just send a message to the Redis channel `backend_add`: + + redis:6379> PUBLISH backend_add 10.0.0.3:80 + (integer) 1 + +Your will see in logs: + + INFO - Load balancing server V4(10.0.0.1:80) + INFO - Load balancing server V4(10.0.0.2:80) + INFO - Now listening on 0.0.0.0:8000 + INFO - Subscribed to Redis channels 'backend_add' and 'backend_remove' + INFO - Added new server 10.0.0.3:80 + +If you decide that you do not need server 2 any longer: + + redis:6379> PUBLISH backend_remove 10.0.0.2:80 + (integer) 1 + +How does it perform? +-------------------- + +Surprisingly well. A quick comparison with HA Proxy in TCP mode with a single +backend containing a single server using iperf results in: + +| Connections | HA Proxy | Nucleon | +| -----------:| ------------:| -------------:| +| 1 | 15.1 Gbits/s | 15.7 Gbits/s | +| 10 | 13.5 Gbits/s | 11.3 Gbits/s | +| 100 | 8.9 Gbits/s | 10.5 Gbits/s | + +Keep in mind that this is a really simple test, far from what real life traffic +looks like. A real benchmark should compare short lived connections with long +running one, etc. + +Licence +------- + +MIT diff --git a/src/backend.rs b/src/backend.rs new file mode 100644 index 0000000..3171dc2 --- /dev/null +++ b/src/backend.rs @@ -0,0 +1,105 @@ +use std::net::{SocketAddr, AddrParseError}; +use std::str::FromStr; + +pub trait GetBackend { + fn get(&mut self) -> Option; + fn add(&mut self, backend_str: &str) -> Result<(), AddrParseError>; + fn remove(&mut self, backend_str: &str) -> Result<(), AddrParseError>; +} + +pub struct RoundRobinBackend { + backends: Vec, + last_used: usize +} + +impl RoundRobinBackend { + pub fn new(backends_str: Vec) -> Result { + let mut backends = Vec::new(); + for backend_str in backends_str { + let backend_socket_addr: SocketAddr = try!(FromStr::from_str(&backend_str)); + backends.push(backend_socket_addr); + info!("Load balancing server {:?}", backend_socket_addr); + } + Ok(RoundRobinBackend { + backends: backends, + last_used: 0 + }) + } +} + +impl GetBackend for RoundRobinBackend { + fn get(&mut self) -> Option { + if self.backends.is_empty() { + return None; + } + self.last_used = (self.last_used + 1) % self.backends.len(); + self.backends.get(self.last_used).map(|b| b.clone()) + } + + fn add(&mut self, backend_str: &str) -> Result<(), AddrParseError> { + let backend_socket_addr: SocketAddr = try!(FromStr::from_str(&backend_str)); + self.backends.push(backend_socket_addr); + Ok(()) + } + + fn remove(&mut self, backend_str: &str) -> Result<(), AddrParseError> { + let backend_socket_addr: SocketAddr = try!(FromStr::from_str(&backend_str)); + self.backends.retain(|&x| x != backend_socket_addr); + Ok(()) + } +} + + +#[cfg(test)] +mod tests { + use std::net::{SocketAddr, AddrParseError}; + use super::{RoundRobinBackend, GetBackend}; + + #[test] + fn test_rrb_backend() { + let backends_str = vec!["127.0.0.1:6000".to_string(), + "127.0.0.1:6001".to_string()]; + let mut rrb = RoundRobinBackend::new(backends_str).unwrap(); + assert_eq!(2, rrb.backends.len()); + + let first_socket_addr = rrb.get().unwrap(); + let second_socket_addr = rrb.get().unwrap(); + let third_socket_addr = rrb.get().unwrap(); + let fourth_socket_addr = rrb.get().unwrap(); + assert_eq!(first_socket_addr, third_socket_addr); + assert_eq!(second_socket_addr, fourth_socket_addr); + assert!(first_socket_addr != second_socket_addr); + } + + #[test] + fn test_empty_rrb_backend() { + let backends_str = vec![]; + let mut rrb = RoundRobinBackend::new(backends_str).unwrap(); + assert_eq!(0, rrb.backends.len()); + assert!(rrb.get().is_none()); + } + + #[test] + fn test_add_to_rrb_backend() { + let mut rrb = RoundRobinBackend::new(vec![]).unwrap(); + assert!(rrb.get().is_none()); + assert!(rrb.add("327.0.0.1:6000").is_err()); + assert!(rrb.get().is_none()); + assert!(rrb.add("127.0.0.1:6000").is_ok()); + assert!(rrb.get().is_some()); + } + + #[test] + fn test_remove_from_rrb_backend() { + let backends_str = vec!["127.0.0.1:6000".to_string(), + "127.0.0.1:6001".to_string()]; + let mut rrb = RoundRobinBackend::new(backends_str).unwrap(); + assert!(rrb.remove("327.0.0.1:6000").is_err()); + assert_eq!(2, rrb.backends.len()); + assert!(rrb.remove("127.0.0.1:6000").is_ok()); + assert_eq!(1, rrb.backends.len()); + assert!(rrb.remove("127.0.0.1:6000").is_ok()); + assert_eq!(1, rrb.backends.len()); + } + +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e2bf99b --- /dev/null +++ b/src/main.rs @@ -0,0 +1,74 @@ +#[macro_use] extern crate log; +extern crate argparse; +extern crate mio; + +mod backend; +mod sync; +mod tcplb; +mod simplelogger; + +use std::sync::{Arc, Mutex}; +use std::process::exit; + +use argparse::{ArgumentParser, StoreTrue, Store, Collect}; +use mio::*; + +fn main() { + let mut servers: Vec = Vec::new(); + let mut bind = "127.0.0.1:8000".to_string(); + let mut redis_url = "redis://localhost".to_string(); + let mut disable_redis = false; + let mut log_level = "info".to_string(); + + { + let mut ap = ArgumentParser::new(); + ap.set_description("Dynamic TCP load balancer"); + + ap.refer(&mut servers) + .add_argument("server", Collect, "Servers to load balance"); + + ap.refer(&mut bind) + .add_option(&["-b", "--bind"], Store, + "Bind the load balancer to address:port (127.0.0.1:8000)"); + + ap.refer(&mut redis_url) + .add_option(&["-r", "--redis"], Store, + "URL of Redis database (redis://localhost)"); + + ap.refer(&mut disable_redis) + .add_option(&["--no-redis"], StoreTrue, + "Disable updates of backend through Redis"); + + ap.refer(&mut log_level) + .add_option(&["-l", "--log"], Store, + "Log level [debug, info, warn, error] (info)"); + + ap.parse_args_or_exit(); + } + + simplelogger::init(&log_level).ok().expect("Failed to init logger"); + + if servers.is_empty() { + println!("Need at least one server to load balance"); + exit(1); + } + + let mut backend = Arc::new(Mutex::new( + backend::RoundRobinBackend::new(servers).unwrap() + )); + + let mut proxy = tcplb::Proxy::new(&bind, backend.clone()); + let mut event_loop = EventLoop::new().unwrap(); + + // Register interest in notifications of new connections + event_loop.register_opt(&proxy.listen_sock, Token(1), EventSet::readable(), + PollOpt::edge()).unwrap(); + + if !disable_redis { + sync::create_sync_thread(backend.clone(), redis_url); + } + + // Start handling events + event_loop.run(&mut proxy).unwrap(); + +} diff --git a/src/simplelogger.rs b/src/simplelogger.rs new file mode 100644 index 0000000..a9ce886 --- /dev/null +++ b/src/simplelogger.rs @@ -0,0 +1,32 @@ +extern crate log; + +use log::{LogRecord, LogLevel, LogMetadata, SetLoggerError, LogLevelFilter}; + +struct SimpleLogger; + +impl log::Log for SimpleLogger { + fn enabled(&self, metadata: &LogMetadata) -> bool { + // Todo: implement this or use an existing library + true + } + + fn log(&self, record: &LogRecord) { + if self.enabled(record.metadata()) { + println!("{} - {}", record.level(), record.args()); + } + } +} + +pub fn init(level: &str) -> Result<(), SetLoggerError> { + let log_level_filter = match level { + "error" => LogLevelFilter::Error, + "warn" => LogLevelFilter::Warn, + "info" => LogLevelFilter::Info, + "debug" => LogLevelFilter::Debug, + _ => panic!("Unknown log level {}", level) + }; + log::set_logger(|max_log_level| { + max_log_level.set(log_level_filter); + Box::new(SimpleLogger) + }) +} diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..f852caf --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,51 @@ +extern crate redis; + +use std::sync::{Arc, Mutex}; +use std::thread; + +use backend::{RoundRobinBackend, GetBackend}; + +pub fn create_sync_thread(backend: Arc>, + redis_url: String) { + thread::spawn(move || { + let pubsub = subscribe_to_redis(&redis_url).unwrap(); + loop { + let msg = pubsub.get_message().unwrap(); + handle_message(backend.clone(), msg); + } + }); +} + +fn subscribe_to_redis(url: &str) -> redis::RedisResult { + let client = try!(redis::Client::open(url)); + let mut pubsub: redis::PubSub = try!(client.get_pubsub()); + try!(pubsub.subscribe("backend_add")); + try!(pubsub.subscribe("backend_remove")); + info!("Subscribed to Redis channels 'backend_add' and 'backend_remove'"); + Ok(pubsub) +} + +fn handle_message(backend: Arc>, msg: redis::Msg) -> redis::RedisResult<()> { + let channel = msg.get_channel_name(); + let payload: String = try!(msg.get_payload()); + debug!("New message on Redis channel {}: '{}'", channel, payload); + + match channel { + "backend_add" => { + let mut backend = backend.lock().unwrap(); + match backend.add(&payload) { + Ok(_) => info!("Added new server {}", payload), + _ => {} + }; + }, + "backend_remove" => { + let mut backend = backend.lock().unwrap(); + match backend.remove(&payload) { + Ok(_) => info!("Removed server {}", payload), + _ => {} + }; + }, + _ => info!("Cannot parse Redis message") + }; + Ok(()) +} diff --git a/src/tcplb.rs b/src/tcplb.rs new file mode 100644 index 0000000..5717234 --- /dev/null +++ b/src/tcplb.rs @@ -0,0 +1,525 @@ +extern crate mio; + +use std::io; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; + +use mio::*; +use mio::buf::ByteBuf; +use mio::tcp::{TcpListener, TcpStream}; +use mio::util::Slab; + +use std::collections::VecDeque; +use std::ops::Drop; +use std::ptr; + +use backend::{RoundRobinBackend, GetBackend}; + +const buffer_size: usize = 8192; +const max_buffers_per_connection: usize = 16; +const max_connections: usize = 512; + +pub struct Proxy { + // socket where incoming connections arrive + pub listen_sock: TcpListener, + + // token of the listening socket + token: Token, + + // backend containing server to proxify + backend: Arc>, + + // slab of Connections (front and back ends) + connections: Slab, + + // queue of tokens waiting to be read + readable_tokens: VecDeque +} + +impl Proxy { + + /// Create an instance of our proxy + pub fn new(listen_addr: &str, backend: Arc>) -> Proxy { + let listen_addr: SocketAddr = FromStr::from_str(&listen_addr) + .ok().expect("Failed to parse listen host:port string"); + let listen_sock = TcpListener::bind(&listen_addr).unwrap(); + info!("Now listening on {}", &listen_addr); + Proxy { + listen_sock: listen_sock, + token: Token(1), + backend: backend, + connections: Slab::new_starting_at(Token(2), max_connections), + readable_tokens: VecDeque::with_capacity(max_connections) + } + } + + /// Put a token in the list of readable tokens + /// + /// Once token is in the list, it is no longer interested in readable events + /// from the event_loop. + fn push_to_readable_tokens(&mut self, event_loop: &mut EventLoop, token: Token) { + self.readable_tokens.push_back(token); + self.connections[token].interest.remove(EventSet::readable()); + self.connections[token].reregister(event_loop); + } + + /// Read as much as it can of a token + /// + /// Stops when we read everything the kernel had for us or the other end send + /// queue is full. + fn read_token(&mut self, event_loop: &mut EventLoop, + token: Token) -> io::Result { + let other_end_token = self.connections[token].end_token.unwrap(); + let buffers_to_read = + max_buffers_per_connection - self.connections[other_end_token].send_queue.len(); + let (exhausted_kernel, messages) = try!( + self.find_connection_by_token(token).read(buffers_to_read) + ); + // let's not tell we have something to write if we don't + if messages.is_empty() { + return Ok(exhausted_kernel); + } + + self.connections[other_end_token].send_messages(messages) + .and_then(|_| { + self.connections[other_end_token].interest.insert(EventSet::writable()); + self.connections[other_end_token].reregister(event_loop) + }) + .unwrap_or_else(|e| { + error!("Failed to queue message for {:?}: {:?}", other_end_token, e); + }); + self.connections[token].reregister(event_loop); + Ok(exhausted_kernel) + } + + /// Try to flush the list of readable tokens + /// + /// Loops on all readable tokens and remove them from the list if we flushed + /// completely the kernel buffer. + fn flush_readable_tokens(&mut self, event_loop: &mut EventLoop) { + for _ in 0..self.readable_tokens.len() { + match self.readable_tokens.pop_front() { + Some(token) => { + match self.read_token(event_loop, token) { + Ok(exhausted_kernel) => { + if exhausted_kernel { + self.connections[token].interest.insert(EventSet::readable()); + self.connections[token].reregister(event_loop); + } else { + self.readable_tokens.push_back(token); + } + }, + Err(e) => panic!("Error while reading {:?}: {}", token, e) + } + }, + None => break + } + } + } + + /// Flush the send queue of a token + /// + /// Drop the connection if we sent everything and other end is gone + fn handle_write_event(&mut self, event_loop: &mut EventLoop, + token: Token) { + match self.connections[token].write() { + Ok(flushed_everything) => { + if flushed_everything { + self.connections[token].interest.remove(EventSet::writable()); + self.connections[token].reregister(event_loop); + } + }, + Err(e) => { + error!("Could not write on {:?}, dropping send queue", token); + self.connections[token].send_queue.clear(); + } + } + + // Terminate connection if other end is gone and send queue if flushed + if (self.connections[token].send_queue.is_empty() && + self.connections[token].end_token.is_none()) { + self.terminate_connection(event_loop, token); + } + } + + /// Accept all pending connections + fn accept(&mut self, event_loop: &mut EventLoop) { + loop { + if !self.accept_one(event_loop) { + break; + } + } + } + + /// Accept a single pending connections + /// + /// Once connection is accepted, it creates a new connection to the backend + /// and links both connections together. + /// + /// Returns true when a connection is accepted successfully, + /// false when no more connection to accept or error happened. + fn accept_one(&mut self, event_loop: &mut EventLoop) -> bool { + let client_sock = match self.listen_sock.accept() { + Ok(s) => { + match s { + Some(sock) => sock, + None => { + debug!("No more socket to accept on this event"); + return false; + } + } + }, + Err(e) => { + error!("Failed to accept new socket, {:?}", e); + return false; + } + }; + match client_sock.peer_addr() { + Ok(client_addr) => info!("New client connection from {}", client_addr), + Err(_) => info!("New client connection from unknown source") + } + let backend_socket_addr = self.backend.lock().unwrap().get().unwrap(); + let mut backend_sock = match TcpStream::connect(&backend_socket_addr) { + Ok(backend_sock) => { + info!("Connected to backend {}", backend_socket_addr); + backend_sock + }, + Err(e) => { + error!("Could not connect to backend: {}", e); + return false; + } + }; + let client_token = self.create_connection_from_sock(client_sock, event_loop); + let backend_token = self.create_connection_from_sock(backend_sock, event_loop); + match client_token { + Some(client_token) => { + match backend_token { + Some(backend_token) => { + self.link_connections_together(client_token, backend_token, + event_loop); + }, + None => { + error!("Cannot create backend Connection, dropping client"); + self.connections.remove(client_token); + return false; + } + } + }, + None => { + match backend_token { + Some(backend_token) => { + error!("Cannot create client Connection, dropping backend"); + self.connections.remove(backend_token); + return false; + }, + None => { + error!("Cannot create client nor backend Connection"); + return false; + } + } + } + }; + + true + + } + + /// Create a Connection instance from a socket + fn create_connection_from_sock(&mut self, sock: TcpStream, + event_loop: &mut EventLoop) -> Option { + self.connections.insert_with(|token| { + info!("Creating Connection with {:?}", token); + let mut connection = Connection::new(sock, token); + connection.register(event_loop); + connection + }) + } + + /// Link two Connection together + /// + /// This makes it easy to know to which Connection send the data we + /// receive on another Connection. + /// A Connection is not interested in read events before being linked correctly + fn link_connections_together(&mut self, client_token: Token, backend_token: Token, + event_loop: &mut EventLoop) { + self.connections[client_token].end_token = Some(backend_token); + self.connections[backend_token].end_token = Some(client_token); + // Now that we have two Connections linked with each other + // we can register to Read events. + self.connections[client_token].interest.insert(EventSet::readable()); + self.connections[backend_token].interest.insert(EventSet::readable()); + self.connections[client_token].reregister(event_loop); + self.connections[backend_token].reregister(event_loop); + } + + /// Terminate a connection as well as its other end + /// + /// Makes sure to flush all pending queues before dropping the other end + /// of a connection. + fn terminate_connection(&mut self, event_loop: &mut EventLoop, token: Token) { + match self.connections[token].end_token { + Some(end_token) => { + if self.connections[end_token].send_queue.is_empty() { + // Nothing to write on the other end, we can drop it + self.connections[end_token].deregister(event_loop); + self.connections.remove(end_token); + } else { + // We still need to write things in the other end + // just stop reading it and we will terminate it + // when we flushed its send_queue + // Todo: Is there a way to schedule a timeout? + self.connections[end_token].end_token = None; + self.connections[end_token].interest.remove(EventSet::readable()); + self.connections[end_token].interest.insert(EventSet::writable()); + self.connections[end_token].reregister(event_loop); + } + }, + None => {} + } + self.connections[token].deregister(event_loop); + self.connections.remove(token); + } + + /// Find a connection in the slab using the given token. + fn find_connection_by_token<'a>(&'a mut self, token: Token) -> &'a mut Connection { + &mut self.connections[token] + } +} + +impl Handler for Proxy { + type Timeout = (); + type Message = (); + + /// Method called when a event from the event loop is notified + fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { + debug!("events [{:?}] on {:?}", events, token); + + // we are only interested in read events from the listening token + // so we can safely assume this is a read event + if token == self.token { + self.accept(event_loop); + info!("Accepted connection(s), now {} Connections", + self.connections.count()); + return; + } + + if events.is_error() { + debug!("Got an error on {:?}", token); + self.terminate_connection(event_loop, token); + return; + } + + if events.is_writable() { + debug!("Got a write event on {:?}", token); + self.handle_write_event(event_loop, token); + } + + if events.is_hup() && events.is_readable() { + debug!("Got a read hang up on {:?}", token); + // bypass the readable tokens queue, let's read until kernel + // is exhausted and drop the connection + self.read_token(event_loop, token); + self.terminate_connection(event_loop, token); + } else if events.is_readable() { + debug!("Got a read event on {:?}", token); + self.push_to_readable_tokens(event_loop, token); + } else if events.is_hup() { + debug!("Got a hup event on {:?}", token); + } + + self.flush_readable_tokens(event_loop); + + debug!("Finished loop with {} Connections and {} readable tokens", + self.connections.count(), self.readable_tokens.len()); + + } +} + +struct Connection { + // handle to the accepted socket + sock: TcpStream, + + // token used to register with the event loop + token: Token, + + // set of events we are interested in + interest: EventSet, + + // messages waiting to be sent out to sock + send_queue: VecDeque, + + // other end of the tunnel + end_token: Option, +} + +impl Drop for Connection { + fn drop(&mut self) { + info!("Dropping Connection with {:?}", self.token); + } +} + +impl Connection { + fn new(sock: TcpStream, token: Token) -> Connection { + Connection { + sock: sock, + token: token, + + // new connections are only listening for a hang up event when + // they are first created. We always want to make sure we are + // listening for the hang up event. We will additionally listen + // for readable and writable events later on. + interest: EventSet::hup() | EventSet::error(), + + send_queue: VecDeque::with_capacity(max_buffers_per_connection), + + // When instanciated a Connection does not have yet an other end + end_token: None + } + } + + /// Read buffers from the kernel + /// + /// Reads at most 'nb_buffers' buffers. + /// + /// Returns a tuple of (bool, Vec) wrapped in a Result. + /// True if kernel has no more data to give, false otherwise. + /// Vec contains the actual data read by chunks of buffers. + fn read(&mut self, nb_buffers: usize) -> io::Result<(bool, Vec)> { + + let mut recv_vec: Vec = Vec::with_capacity(nb_buffers); + let mut exhausted_kernel: bool = false; + + for _ in 0..nb_buffers { + let mut recv_buf = ByteBuf::mut_with_capacity(buffer_size); + match self.sock.try_read_buf(&mut recv_buf) { + // the socket receive buffer is empty, so let's move on + // try_read_buf internally handles WouldBlock here too + Ok(None) => { + debug!("CONN : we read 0 bytes, exhausted kernel"); + exhausted_kernel = true; + break; + }, + Ok(Some(n)) => { + debug!("CONN : we read {} bytes", n); + if n == 0 { + // Reading on a closed socket never gives Ok(None)... + // Todo: check why + exhausted_kernel = true; + break; + } + if n > 0 { + // flip changes our type from MutByteBuf to ByteBuf + recv_vec.push(recv_buf.flip()); + } + }, + Err(e) => { + error!("Failed to read buffer for {:?}, error: {}", self.token, e); + return Err(e); + } + } + } + + Ok((exhausted_kernel, recv_vec)) + } + + /// Try to flush all send queue + /// + /// Returns true when everything is flushed, false otherwise + fn write(&mut self) -> io::Result { + while !self.send_queue.is_empty() { + let wrote_everything = try!(self.write_one_buf()); + if !wrote_everything { + // Kernel did not accept all our data, let's keep + // interest on write events so we get notified when + // kernel is ready to accept our data in the future. + return Ok(false); + } + } + + Ok(true) + } + + /// Write one buffer to the socket + /// + /// Returns true if the totality of the buffer was sent, + /// false if the kernel did not accept everything. + fn write_one_buf(&mut self) -> io::Result { + + self.send_queue.pop_front() + .ok_or(io::Error::new(io::ErrorKind::Other, "Could not pop send queue")) + .and_then(|mut buf| { + match self.sock.try_write_buf(&mut buf) { + Ok(None) => { + debug!("client flushing buf; WouldBlock"); + + // put message back into the queue so we can try again + self.send_queue.push_front(buf); + Ok(false) + }, + Ok(Some(n)) => { + debug!("CONN : we wrote {} bytes", n); + if buf.has_remaining() { + self.send_queue.push_front(buf); + Ok(false) + } else { + Ok(true) + } + }, + Err(e) => { + error!("Failed to send buffer for {:?}, error: {}", self.token, e); + Err(e) + } + } + }) + + } + + /// Queue an outgoing message to the client + /// + /// This will cause the connection to register interests in write + /// events with the event loop. + fn send_messages(&mut self, messages: Vec) -> io::Result<()> { + // Todo: use Vec.append() but not in Rust stable + self.send_queue.extend(messages.into_iter()); + self.interest.insert(EventSet::writable()); + Ok(()) + } + + /// Register interest in read events with the event_loop + /// + /// This will let the event loop get notified on events happening on + /// this connection. + fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + event_loop.register_opt( + &self.sock, + self.token, + self.interest, + PollOpt::edge() | PollOpt::oneshot() + ).or_else(|e| { + error!("Failed to register {:?}, {:?}", self.token, e); + Err(e) + }) + } + + /// Re-register interest in events with the event_loop + fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + event_loop.reregister( + &self.sock, + self.token, + self.interest, + PollOpt::edge() | PollOpt::oneshot() + ).or_else(|e| { + error!("Failed to reregister {:?}, {:?}", self.token, e); + Err(e) + }) + } + + /// De-register every interest in events for this connection + fn deregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + event_loop.deregister(&self.sock).or_else(|e| { + error!("Failed to deregister {:?}, {:?}", self.token, e); + Err(e) + }) + } +}