Skip to content

Commit

Permalink
Merge 3d72bd3 into e8ec295
Browse files Browse the repository at this point in the history
  • Loading branch information
tikue committed May 10, 2016
2 parents e8ec295 + 3d72bd3 commit ae0a2e1
Show file tree
Hide file tree
Showing 27 changed files with 2,683 additions and 1,945 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target
Cargo.lock
.cargo
*.swp
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ before_script:
script:
- |
(cd tarpc && travis-cargo build) &&
(cd tarpc && travis-cargo test)
travis-cargo build && travis-cargo test
after_success:
- (cd tarpc && travis-cargo coveralls --no-sudo)
- travis-cargo coveralls --no-sudo

env:
global:
Expand Down
13 changes: 12 additions & 1 deletion tarpc/Cargo.toml → Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,23 @@ description = "An RPC framework for Rust with a focus on ease of use."

[dependencies]
bincode = "0.5"
bytes = "0.3"
byteorder = "0.5"
log = "0.3"
mio = "0.5"
quick-error = "1.0"
scoped-pool = "0.1"
serde = "0.7"
env_logger = "0.3"
unix_socket = "0.5"

[dev-dependencies]
lazy_static = "0.1"
lazy_static = "0.2"
env_logger = "0.3"
tempdir = "0.3"

[profile.release]
debug = true

[features]
nightly = ["serde/nightly"]
82 changes: 82 additions & 0 deletions benches/single_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.

#![cfg_attr(test, feature(test))]

#[cfg(test)]
#[macro_use]
extern crate lazy_static;

#[cfg(test)]
#[macro_use]
extern crate tarpc;

#[macro_use]
extern crate log;

#[cfg(test)]
#[allow(dead_code)] // generated Client isn't used in this benchmark
mod benchmark {
extern crate env_logger;
extern crate test;

use tarpc::protocol::{ServeHandle, client};
use self::test::Bencher;
use std::sync::{Arc, Mutex};

service! {
rpc hello(s: String) -> String;
}

struct HelloServer;
impl Service for HelloServer {
fn hello(&mut self, ctx: Ctx, s: String) {
ctx.hello(&s).unwrap();
}
}

// Prevents resource exhaustion when benching
lazy_static! {
static ref HANDLES: Arc<Mutex<Vec<ServeHandle>>> = {
let handles = (0..2).map(|_| HelloServer.spawn("localhost:0").unwrap()).collect();
Arc::new(Mutex::new(handles))
};
static ref CLIENTS: Arc<Mutex<Vec<Client>>> = {
let lock = HANDLES.lock().unwrap();
let registry = client::Dispatcher::spawn().unwrap();
let clients = (0..35).map(|i| Client::register(lock[i % lock.len()].local_addr(), &registry).unwrap()).collect();
Arc::new(Mutex::new(clients))
};
}

#[bench]
fn hello(bencher: &mut Bencher) {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let _ = env_logger::init();
let clients = CLIENTS.lock().unwrap();
let mut clients = clients.iter().cycle();
let concurrency = 1200;
let mut count = 0;
let finished = Arc::new(AtomicUsize::new(0));
let bob = "Bob".to_string();
let current = ::std::thread::current();
bencher.iter(|| {
let fin = finished.clone();
let cur = current.clone();
clients.next().unwrap().hello(move |_reply| {
_reply.unwrap();
fin.fetch_add(1, Ordering::SeqCst);
cur.unpark();
}, &bob).unwrap();
count += 1;
if count % concurrency == 0 {
while finished.load(Ordering::SeqCst) < count {
::std::thread::park();
}
}
});
}
}
56 changes: 56 additions & 0 deletions examples/single_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.

#[macro_use]
extern crate tarpc;

extern crate env_logger;
#[macro_use]
extern crate log;

extern crate mio;

use std::net::ToSocketAddrs;
use std::time::{Duration, Instant};

service! {
rpc hello(buf: Vec<u8>) -> Vec<u8>;
}

struct HelloServer;
impl Service for HelloServer {
#[inline]
fn hello(&mut self, ctx: Ctx, buf: Vec<u8>) {
ctx.hello(&buf).unwrap();
}
}

fn main() {
let _ = env_logger::init();
let addr = "127.0.0.1:58765".to_socket_addrs().unwrap().next().unwrap();
let handle = HelloServer.spawn(addr).unwrap();
let client = FutureClient::spawn(&addr).unwrap();
let concurrency = 100;
let mut futures = Vec::with_capacity(concurrency);

info!("Starting...");
let start = Instant::now();
let max = Duration::from_secs(10);
let mut total_rpcs = 0;
let buf = vec![1; 1028];

while start.elapsed() < max {
for _ in 0..concurrency {
futures.push(client.hello(&buf));
}
for f in futures.drain(..) {
f.get().unwrap();
}
total_rpcs += concurrency;
}
info!("Done. Total rpcs in 10s: {}", total_rpcs);
client.shutdown().unwrap();
handle.shutdown().unwrap();
}
75 changes: 75 additions & 0 deletions examples/throughput.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#[macro_use]
extern crate tarpc;
extern crate env_logger;

use std::time;
use std::net;
use std::thread;
use std::io::{Read, Write};

fn gen_vec(size: usize) -> Vec<u8> {
let mut vec: Vec<u8> = Vec::with_capacity(size);
for i in 0..size {
vec.push((i % 1 << 8) as u8);
}
vec
}

service! {
rpc read(size: u32) -> Vec<u8>;
}

struct Server;

impl Service for Server {
fn read(&mut self, ctx: Ctx, size: u32) {
ctx.read(&gen_vec(size as usize)).unwrap();
}
}

const CHUNK_SIZE: u32 = 1 << 18;

fn bench_tarpc(target: u64) {
let handle = Server.spawn("0.0.0.0:0").unwrap();
let client = BlockingClient::spawn(handle.local_addr()).unwrap();
let start = time::Instant::now();
let mut nread = 0;
while nread < target {
client.read(&CHUNK_SIZE).unwrap();
nread += CHUNK_SIZE as u64;
}
let duration = time::Instant::now() - start;
println!("TARPC: {}MB/s",
(target as f64 / (1024f64 * 1024f64)) /
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9));
}

fn bench_tcp(target: u64) {
let l = net::TcpListener::bind("0.0.0.0:0").unwrap();
let addr = l.local_addr().unwrap();
thread::spawn(move || {
let (mut stream, _) = l.accept().unwrap();
let mut vec = gen_vec(CHUNK_SIZE as usize);
while let Ok(_) = stream.write_all(&vec[..]) {
vec = gen_vec(CHUNK_SIZE as usize);
}
});
let mut stream = net::TcpStream::connect(&addr).unwrap();
let mut buf = vec![0; CHUNK_SIZE as usize];
let start = time::Instant::now();
let mut nread = 0;
while nread < target {
stream.read_exact(&mut buf[..]).unwrap();
nread += CHUNK_SIZE as u64;
}
let duration = time::Instant::now() - start;
println!("TCP: {}MB/s",
(target as f64 / (1024f64 * 1024f64)) /
(duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 10E9));
}

fn main() {
let _ = env_logger::init();
bench_tarpc(256 << 20);
bench_tcp(256 << 20);
}
70 changes: 70 additions & 0 deletions examples/two_clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate tarpc;
extern crate serde;
extern crate mio;
extern crate bincode;
extern crate env_logger;
use mio::*;
use tarpc::protocol::{client, server};

mod bar {
service! {
rpc bar(i: i32) -> i32;
}
}

struct Bar;
impl bar::Service for Bar {
fn bar(&mut self, ctx: bar::Ctx, i: i32) {
ctx.bar(&i).unwrap();
}
}

mod baz {
service! {
rpc baz(s: String) -> String;
}
}

struct Baz;
impl baz::Service for Baz {
fn baz(&mut self, ctx: baz::Ctx, s: String) {
ctx.baz(&format!("Hello, {}!", s)).unwrap();
}
}

macro_rules! pos {
() => (concat!(file!(), ":", line!()))
}

use bar::Service as BarService;
use baz::Service as BazService;

fn main() {
let _ = env_logger::init();
let server_registry = server::Dispatcher::spawn().unwrap();
let bar = Bar.register("localhost:0", &server_registry).unwrap();
let baz = Baz.register("localhost:0", &server_registry).unwrap();

info!("About to create Clients");
let client_registry = client::Dispatcher::spawn().unwrap();
let bar_client = bar::BlockingClient::register(bar.local_addr(), &client_registry).unwrap();
let baz_client = baz::BlockingClient::register(baz.local_addr(), &client_registry).unwrap();

info!("Result: {:?}", bar_client.bar(&17));

let total = 20;
for i in 1..(total+1) {
if i % 2 == 0 {
info!("Result 1: {:?}", bar_client.bar(&i));
} else {
info!("Result 2: {:?}", baz_client.baz(&i.to_string()));
}
}

info!("Done.");
client_registry.shutdown().expect(pos!());
server_registry.shutdown().expect(pos!());
}
28 changes: 13 additions & 15 deletions hooks/pre-push
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ run_cargo() {
else
VERB=Testing
fi
if [ "$3" != "" ]; then
printf "${PREFIX} $VERB $2 on $3 ... "
rustup run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
if [ "$2" != "" ]; then
printf "${PREFIX} $VERB on $2... "
if [ "$2" != "nightly" ]; then
rustup run $2 cargo $1 &>/dev/null
else
rustup run nightly cargo $1 --features nightly &>/dev/null
fi
else
printf "${PREFIX} $VERB $2 ... "
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
printf "${PREFIX} $VERB... "
cargo $1 &>/dev/null
fi
if [ "$?" != "0" ]; then
printf "${FAILURE}\n"
Expand Down Expand Up @@ -104,18 +108,12 @@ if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
exit 1
fi

run_cargo build tarpc stable
run_cargo build tarpc_examples stable

run_cargo build tarpc beta
run_cargo build tarpc_examples beta

run_cargo build tarpc nightly
run_cargo build tarpc_examples nightly
run_cargo build stable
run_cargo build beta
run_cargo build nightly

# We still rely on some nightly stuff for tests
run_cargo test tarpc nightly
run_cargo test tarpc_examples nightly
run_cargo test nightly
else
printf "${YELLOW}NOT FOUND${NC}\n"
printf "${WARNING} Falling back to current toolchain: $(rustc -V)\n"
Expand Down
File renamed without changes.
Loading

0 comments on commit ae0a2e1

Please sign in to comment.