Skip to content

Commit

Permalink
Merge pull request #5 from steveklabnik/rustfmt
Browse files Browse the repository at this point in the history
Run rustfmt
  • Loading branch information
NicolasLM committed Oct 26, 2015
2 parents 59a5a95 + cfd0d2a commit daa8f74
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 122 deletions.
18 changes: 8 additions & 10 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub trait GetBackend {

pub struct RoundRobinBackend {
backends: Vec<SocketAddr>,
last_used: usize
last_used: usize,
}

impl RoundRobinBackend {
Expand All @@ -22,7 +22,7 @@ impl RoundRobinBackend {
}
Ok(RoundRobinBackend {
backends: backends,
last_used: 0
last_used: 0,
})
}
}
Expand All @@ -35,13 +35,13 @@ impl GetBackend for RoundRobinBackend {
self.last_used = (self.last_used + 1) % self.backends.len();
self.backends.get(self.last_used).map(|b| b.clone())
}

fn add(&mut self, backend_str: &str) -> Result<(), AddrParseError> {
let backend_socket_addr: SocketAddr = try!(FromStr::from_str(&backend_str));
self.backends.push(backend_socket_addr);
Ok(())
}

fn remove(&mut self, backend_str: &str) -> Result<(), AddrParseError> {
let backend_socket_addr: SocketAddr = try!(FromStr::from_str(&backend_str));
self.backends.retain(|&x| x != backend_socket_addr);
Expand All @@ -57,8 +57,7 @@ mod tests {

#[test]
fn test_rrb_backend() {
let backends_str = vec!["127.0.0.1:6000".to_string(),
"127.0.0.1:6001".to_string()];
let backends_str = vec!["127.0.0.1:6000".to_string(), "127.0.0.1:6001".to_string()];
let mut rrb = RoundRobinBackend::new(backends_str).unwrap();
assert_eq!(2, rrb.backends.len());

Expand All @@ -70,7 +69,7 @@ mod tests {
assert_eq!(second_socket_addr, fourth_socket_addr);
assert!(first_socket_addr != second_socket_addr);
}

#[test]
fn test_empty_rrb_backend() {
let backends_str = vec![];
Expand All @@ -91,8 +90,7 @@ mod tests {

#[test]
fn test_remove_from_rrb_backend() {
let backends_str = vec!["127.0.0.1:6000".to_string(),
"127.0.0.1:6001".to_string()];
let backends_str = vec!["127.0.0.1:6000".to_string(), "127.0.0.1:6001".to_string()];
let mut rrb = RoundRobinBackend::new(backends_str).unwrap();
assert!(rrb.remove("327.0.0.1:6000").is_err());
assert_eq!(2, rrb.backends.len());
Expand All @@ -101,5 +99,5 @@ mod tests {
assert!(rrb.remove("127.0.0.1:6000").is_ok());
assert_eq!(1, rrb.backends.len());
}

}
45 changes: 23 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#[macro_use] extern crate log;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate argparse;
extern crate mio;
Expand All @@ -25,24 +26,23 @@ fn main() {
let mut ap = ArgumentParser::new();
ap.set_description("Dynamic TCP load balancer");

ap.refer(&mut servers)
.add_argument("server", Collect, "Servers to load balance");
ap.refer(&mut servers).add_argument("server", Collect, "Servers to load balance");

ap.refer(&mut bind)
.add_option(&["-b", "--bind"], Store,
"Bind the load balancer to address:port (127.0.0.1:8000)");
ap.refer(&mut bind).add_option(&["-b", "--bind"],
Store,
"Bind the load balancer to address:port (127.0.0.1:8000)");

ap.refer(&mut redis_url)
.add_option(&["-r", "--redis"], Store,
"URL of Redis database (redis://localhost)");
ap.refer(&mut redis_url).add_option(&["-r", "--redis"],
Store,
"URL of Redis database (redis://localhost)");

ap.refer(&mut disable_redis)
.add_option(&["--no-redis"], StoreTrue,
"Disable updates of backend through Redis");
ap.refer(&mut log_level)
.add_option(&["-l", "--log"], Store,
"Log level [debug, info, warn, error] (info)");
ap.refer(&mut disable_redis).add_option(&["--no-redis"],
StoreTrue,
"Disable updates of backend through Redis");

ap.refer(&mut log_level).add_option(&["-l", "--log"],
Store,
"Log level [debug, info, warn, error] (info)");

ap.parse_args_or_exit();
}
Expand All @@ -56,16 +56,17 @@ fn main() {
exit(1);
}

let backend = Arc::new(Mutex::new(
backend::RoundRobinBackend::new(servers).unwrap()
));
let backend = Arc::new(Mutex::new(backend::RoundRobinBackend::new(servers).unwrap()));

let mut proxy = tcplb::Proxy::new(&bind, backend.clone());
let mut proxy = tcplb::Proxy::new(&bind, backend.clone());
let mut event_loop = EventLoop::new().unwrap();

// Register interest in notifications of new connections
event_loop.register_opt(&proxy.listen_sock, Token(1), EventSet::readable(),
PollOpt::edge()).unwrap();
event_loop.register_opt(&proxy.listen_sock,
Token(1),
EventSet::readable(),
PollOpt::edge())
.unwrap();

if !disable_redis {
sync::create_sync_thread(backend.clone(), redis_url);
Expand Down
23 changes: 12 additions & 11 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use std::thread;

use backend::{RoundRobinBackend, GetBackend};

pub fn create_sync_thread(backend: Arc<Mutex<RoundRobinBackend>>,
redis_url: String) {
pub fn create_sync_thread(backend: Arc<Mutex<RoundRobinBackend>>, redis_url: String) {
thread::spawn(move || {
let pubsub = subscribe_to_redis(&redis_url).unwrap();
loop {
Expand All @@ -25,27 +24,29 @@ fn subscribe_to_redis(url: &str) -> redis::RedisResult<redis::PubSub> {
Ok(pubsub)
}

fn handle_message(backend: Arc<Mutex<RoundRobinBackend>>, msg: redis::Msg) -> redis::RedisResult<()> {
fn handle_message(backend: Arc<Mutex<RoundRobinBackend>>,
msg: redis::Msg)
-> redis::RedisResult<()> {
let channel = msg.get_channel_name();
let payload: String = try!(msg.get_payload());
debug!("New message on Redis channel {}: '{}'", channel, payload);

match channel {
"backend_add" => {
let mut backend = backend.lock().unwrap();
let mut backend = backend.lock().unwrap();
match backend.add(&payload) {
Ok(_) => info!("Added new server {}", payload),
_ => {}
};
},
}
}
"backend_remove" => {
let mut backend = backend.lock().unwrap();
let mut backend = backend.lock().unwrap();
match backend.remove(&payload) {
Ok(_) => info!("Removed server {}", payload),
_ => {}
};
},
_ => info!("Cannot parse Redis message")
};
}
}
_ => info!("Cannot parse Redis message"),
}
Ok(())
}
Loading

0 comments on commit daa8f74

Please sign in to comment.