Skip to content

Commit

Permalink
Fix up the tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cholcombe973 committed Aug 18, 2018
1 parent 3e6d2f5 commit 5debcab
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 55 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/lib/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct Global {
pub fsid: String, // Uuid Serialize and Deserialize isn't implemented for Uuid
}

#[derive(Debug, Deserialize, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize)]
pub struct Peer {
pub ip: IpAddr,
pub port: u16,
Expand Down
22 changes: 13 additions & 9 deletions src/lib/layout/distribute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::super::config::Peer;

/// Store files across servers and backend paths. There is no data redundancy
/// when using this
#[derive(Clone, Debug)]
pub struct Distribute {
entry: (Peer, PathBuf),
}
Expand All @@ -28,11 +29,10 @@ impl Node for Distribute {
}
}


#[test]
fn test_distribute() {
use std::net::{IpAddr, Ipv4Addr};
use self::rendezvous_hash::RendezvousNodes;
use std::net::{IpAddr, Ipv4Addr};

let e1 = Distribute {
entry: (
Expand All @@ -53,15 +53,19 @@ fn test_distribute() {
),
};
let mut nodes = RendezvousNodes::default();
nodes.insert(e1);
nodes.insert(e2);
nodes.insert(e1.clone());
nodes.insert(e2.clone());

// FIX ME This is not a perfect distribution
// This should correspond to replica set e1
let r1 = nodes.calc_candidates(&"hello").next().unwrap();
{
let r1 = &nodes.calc_candidates(&"hello").next().unwrap();
assert_eq!(&r1.entry, &e2.entry);
}

// This should correspond to replica set e2
let r2 = nodes.calc_candidates(&"key_foo").next().unwrap();

assert_eq!(r1.entry, &e1.entry);
assert_eq!(r2.entry, &e2.entry);
{
let r2 = nodes.calc_candidates(&"key_foo").next().unwrap();
assert_eq!(&r2.entry, &e2.entry);
}
}
40 changes: 26 additions & 14 deletions src/lib/layout/replicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::super::config::Peer;

/// Store files across a set of servers and paths. Replicate will copy
/// a file X number of times to ensure data redundancy.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Replicate {
entry: Vec<(Peer, PathBuf)>,
}
Expand All @@ -31,8 +31,8 @@ impl Node for Replicate {

#[test]
fn test_replicate() {
use std::net::{IpAddr, Ipv4Addr};
use self::rendezvous_hash::RendezvousNodes;
use std::net::{IpAddr, Ipv4Addr};
// One 2x replica set
let e1 = Replicate {
entry: vec![
Expand Down Expand Up @@ -85,21 +85,33 @@ fn test_replicate() {
],
};
let mut nodes = RendezvousNodes::default();
nodes.insert(e1);
nodes.insert(e2);
nodes.insert(e1.clone());
nodes.insert(e2.clone());

// This should correspond to replica set e1
let replica_set_1 = nodes.calc_candidates(&"hello").next().unwrap();
{
let replica_set_1 = &nodes.calc_candidates(&"hello").next().unwrap();
assert_eq!(
replica_set_1.entry,
vec![
e1.entry[0].clone(),
e1.entry[1].clone(),
e1.entry[2].clone()
],
);
}

// This should correspond to replica set e2
let replica_set_2 = nodes.calc_candidates(&"key_foo").next().unwrap();
{
let replica_set_2 = &nodes.calc_candidates(&"key_foo").next().unwrap();

assert_eq!(
replica_set_1.entry,
vec![&e1.entry[0], &e1.entry[1], &e1.entry[2]],
);
assert_eq!(
replica_set_2.entry,
vec![&e2.entry[0], &e2.entry[1], &e2.entry[2]],
);
assert_eq!(
replica_set_2.entry,
vec![
e2.entry[0].clone(),
e2.entry[1].clone(),
e2.entry[2].clone()
],
);
}
}
7 changes: 7 additions & 0 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ use std::fmt::Debug;
use self::api::service::*;
use self::nix::Errno;

/*
How do streaming operations work?
Could the Actix framework help make some of this work easier?
Delegation is handled by the actors and this could handle the hashing
and sockets.
*/

/*
Each rusix process is made of 'plugins' (PipelinePlugin) stacked on
top of each other in a particular fashion to form a 'graph'.
Expand Down
49 changes: 21 additions & 28 deletions src/pipeline/protocols/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ extern crate zmq;

use std::collections::HashMap;
use std::path::PathBuf;
use std::thread;
use std::thread::JoinHandle;

use self::api::service::*;
use self::futures::Future;
Expand Down Expand Up @@ -32,39 +34,30 @@ impl Client {
// Client is the end of the pipeline.
// send the Fop over to the server(s)
let context = zmq::Context::new();
let client = context.socket(zmq::DEALER).unwrap();
client
.set_identity(self.peer_name.as_bytes())
.expect("failed setting client id");

let mut handles: Vec<JoinHandle<()>> = Vec::new();
for entry in layout {
client
.connect(&format!("tcp://{}:{}", entry.0.ip, entry.0.port))
.expect("failed connecting client");

let request = format!("request #{}", request_nbr);
client
.send(&request, 0)
.expect("client failed sending request");
// TODO: Change over to cpupool?
/*
let t = thread::spawn(|| {
let client = context.socket(zmq::REQ).expect("socket creation failed");
client
.set_identity(self.peer_name.as_bytes())
.expect("failed setting client id");
client
.connect(&format!("tcp://{}:{}", entry.0.ip, entry.0.port))
.expect("failed connecting client");
client.send(&vec![], 0).map_err(|e| e.to_string()).expect("request failed");
});
handles.push(t);
*/
}

loop {
for _ in 0..100 {
if client.poll(zmq::POLLIN, 10).expect("client failed polling") > 0 {
let msg = client
.recv_multipart(0)
.expect("client failed receivng response");
println!("{}", str::from_utf8(&msg[msg.len() - 1]).unwrap());
}
}
request_nbr = request_nbr + 1;
let request = format!("request #{}", request_nbr);
client
.send(&request, 0)
.expect("client failed sending request");
// Wait for completion
for h in handles {
h.join();
}
// Packet sent
//Ok(())
Ok(())
}

fn stop(&self) {}
Expand Down
6 changes: 3 additions & 3 deletions src/pipeline/protocols/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ impl Server {
// send the result back to the client.
fn process_fop(&self, io_type: &Fop, data: &mut FileOperation) -> Result<(), String> {
let context = zmq::Context::new();
let frontend = context.socket(zmq::ROUTER).unwrap();
let mut frontend = context.socket(zmq::ROUTER).unwrap();
frontend
.bind("tcp://*:5570")
.expect("server failed binding frontend");
let backend = context.socket(zmq::DEALER).unwrap();
let mut backend = context.socket(zmq::DEALER).unwrap();
backend
.bind("inproc://backend")
.expect("server failed binding backend");
for _ in 0..5 {
let ctx = context.clone();
thread::spawn(move || server_worker(&ctx));
//thread::spawn(move || server_worker(&ctx));
}
zmq::proxy(&mut frontend, &mut backend).expect("server failed proxying");
Ok(())
Expand Down
8 changes: 8 additions & 0 deletions vscode.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"folders": [
{
"path": "."
}
],
"settings": {}
}

0 comments on commit 5debcab

Please sign in to comment.