Skip to content

Commit

Permalink
*: switch to futures 0.3 (tikv#447)
Browse files Browse the repository at this point in the history
Close tikv#237.

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Apr 2, 2020
1 parent acfa3a4 commit fd66ec7
Show file tree
Hide file tree
Showing 41 changed files with 867 additions and 945 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Expand Up @@ -19,7 +19,7 @@ all-features = true
[dependencies]
grpcio-sys = { path = "grpc-sys", version = "0.5.0" }
libc = "0.2"
futures = "^0.1.15"
futures = "0.3"
protobuf = { version = "2.0", optional = true }
prost = { version = "0.6", optional = true }
bytes = { version = "0.5", optional = true }
Expand All @@ -42,3 +42,6 @@ debug = true

[badges]
travis-ci = { repository = "pingcap/grpc-rs" }

[patch.crates-io]
grpcio-compiler = { path = "compiler", version = "0.5.0", default-features = false }
4 changes: 2 additions & 2 deletions benchmark/Cargo.toml
Expand Up @@ -12,13 +12,13 @@ prost-codec = ["grpcio/prost-codec", "grpcio-proto/prost-codec"]
[dependencies]
grpcio = { path = ".." }
grpcio-proto = { path = "../proto", default-features = false }
futures = "0.1"
futures = "0.3"
libc = "0.2"
grpcio-sys = { path = "../grpc-sys" }
rand = "0.7"
rand_distr = "0.2"
rand_xorshift = "0.2"
tokio-timer = "0.1"
futures-timer = "3.0"
clap = "2.23"
log = "0.4"
slog = "2.0"
Expand Down
22 changes: 17 additions & 5 deletions benchmark/src/bench.rs
Expand Up @@ -6,7 +6,7 @@ use std::io::Read;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use futures::{Future, Sink, Stream};
use futures::prelude::*;
use grpc::{
self, ClientStreamingSink, DuplexSink, MessageReader, Method, MethodType, RequestStream,
RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, ServiceBuilder, UnarySink,
Expand Down Expand Up @@ -39,9 +39,16 @@ impl BenchmarkService for Benchmark {
&mut self,
ctx: RpcContext,
stream: RequestStream<SimpleRequest>,
sink: DuplexSink<SimpleResponse>,
mut sink: DuplexSink<SimpleResponse>,
) {
let f = sink.send_all(stream.map(|req| (gen_resp(&req), WriteFlags::default())));
let f = async move {
sink.send_all(
&mut stream.map(|req| req.map(|req| (gen_resp(&req), WriteFlags::default()))),
)
.await?;
sink.close().await?;
Ok(())
};
let keep_running = self.keep_running.clone();
spawn!(ctx, keep_running, "streaming", f)
}
Expand Down Expand Up @@ -90,9 +97,14 @@ impl Generic {
&self,
ctx: &RpcContext,
stream: RequestStream<Vec<u8>>,
sink: DuplexSink<Vec<u8>>,
mut sink: DuplexSink<Vec<u8>>,
) {
let f = sink.send_all(stream.map(|req| (req, WriteFlags::default())));
let f = async move {
sink.send_all(&mut stream.map(|req| req.map(|req| (req, WriteFlags::default()))))
.await?;
sink.close().await?;
Ok(())
};
let keep_running = self.keep_running.clone();
spawn!(ctx, keep_running, "streaming", f)
}
Expand Down
202 changes: 73 additions & 129 deletions benchmark/src/client.rs
Expand Up @@ -6,28 +6,23 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

use futures::future::Loop;
use futures::sync::oneshot::{self, Receiver, Sender};
use futures::{future, Async, Future, Sink, Stream};
use grpc::{
use futures::channel::oneshot::{self, Receiver, Sender};
use futures::prelude::*;
use grpcio::{
CallOption, Channel, ChannelBuilder, Client as GrpcClient, EnvBuilder, Environment, WriteFlags,
};
use grpc_proto::testing::control::{ClientConfig, ClientType, RpcType};
use grpc_proto::testing::messages::SimpleRequest;
use grpc_proto::testing::services_grpc::BenchmarkServiceClient;
use grpc_proto::testing::stats::ClientStats;
use grpc_proto::util as proto_util;
use grpcio_proto::testing::control::{ClientConfig, ClientType, RpcType};
use grpcio_proto::testing::messages::SimpleRequest;
use grpcio_proto::testing::services_grpc::BenchmarkServiceClient;
use grpcio_proto::testing::stats::ClientStats;
use grpcio_proto::util as proto_util;
use rand::{self, SeedableRng};
use rand_distr::{Distribution, Exp};
use rand_xorshift::XorShiftRng;
use tokio_timer::{Sleep, Timer};

use crate::bench;
use crate::error::Error;
use crate::util::{self, CpuRecorder, Histogram};

type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>;

fn gen_req(cfg: &ClientConfig) -> SimpleRequest {
let mut req = SimpleRequest::default();
let payload_config = cfg.get_payload_config();
Expand Down Expand Up @@ -87,7 +82,6 @@ struct ExecutorContext<B> {
keep_running: Arc<AtomicBool>,
histogram: Arc<Mutex<Histogram>>,
backoff: B,
timer: Timer,
_trace: Sender<()>,
}

Expand All @@ -97,15 +91,13 @@ impl<B: Backoff> ExecutorContext<B> {
histogram: Arc<Mutex<Histogram>>,
keep_running: Arc<AtomicBool>,
backoff: B,
timer: Timer,
) -> (ExecutorContext<B>, Receiver<()>) {
let (tx, rx) = oneshot::channel();
(
ExecutorContext {
keep_running,
histogram,
backoff,
timer,
_trace: tx,
},
rx,
Expand All @@ -118,8 +110,8 @@ impl<B: Backoff> ExecutorContext<B> {
his.observe(f);
}

fn backoff_async(&mut self) -> Option<Sleep> {
self.backoff.backoff_time().map(|dur| self.timer.sleep(dur))
fn backoff_async(&mut self) -> Option<futures_timer::Delay> {
self.backoff.backoff_time().map(futures_timer::Delay::new)
}

fn backoff(&mut self) {
Expand Down Expand Up @@ -151,50 +143,36 @@ impl<B: Backoff + Send + 'static> GenericExecutor<B> {
}
}

fn execute_stream(self) {
fn execute_stream(mut self) {
let client = self.client.clone();
let keep_running = self.ctx.keep_running.clone();
let (sender, receiver) = self
let (mut sender, mut receiver) = self
.client
.duplex_streaming(
&bench::METHOD_BENCHMARK_SERVICE_GENERIC_CALL,
CallOption::default(),
)
.unwrap();
let f = future::loop_fn(
(sender, self, receiver),
move |(sender, mut executor, receiver)| {
let f = async move {
loop {
let latency_timer = Instant::now();
let send = sender.send((executor.req.clone(), WriteFlags::default()));
send.map_err(Error::from).and_then(move |sender| {
receiver
.into_future()
.map_err(|(e, _)| Error::from(e))
.and_then(move |(_, r)| {
executor.ctx.observe_latency(latency_timer.elapsed());
let mut time = executor.ctx.backoff_async();
let mut res = Some((sender, executor, r));
future::poll_fn(move || {
if let Some(ref mut t) = time {
try_ready!(t.poll());
}
time.take();
let r = res.take().unwrap();
let l = if r.1.ctx.keep_running() {
Loop::Continue(r)
} else {
Loop::Break(r)
};
Ok(Async::Ready(l))
})
})
})
},
)
.and_then(|(mut s, e, r)| {
future::poll_fn(move || s.close().map_err(Error::from)).map(|_| (e, r))
})
.and_then(|(e, r)| r.into_future().map(|_| e).map_err(|(e, _)| Error::from(e)));
sender
.send((self.req.clone(), WriteFlags::default()))
.await?;
receiver.try_next().await?;
self.ctx.observe_latency(latency_timer.elapsed());
let mut time = self.ctx.backoff_async();
if let Some(t) = &mut time {
t.await;
}
if !self.ctx.keep_running() {
break;
}
}
sender.close().await?;
receiver.try_next().await?;
Ok(())
};
spawn!(client, keep_running, "streaming ping pong", f)
}
}
Expand Down Expand Up @@ -228,74 +206,52 @@ impl<B: Backoff + Send + 'static> RequestExecutor<B> {
});
}

fn execute_unary_async(self) {
fn execute_unary_async(mut self) {
let client = self.client.clone();
let keep_running = self.ctx.keep_running.clone();
let f = future::loop_fn(self, move |mut executor| {
let latency_timer = Instant::now();
let handler = executor.client.unary_call_async(&executor.req).unwrap();

handler.map_err(Error::from).and_then(move |_| {
let f = async move {
loop {
let latency_timer = Instant::now();
self.client.unary_call_async(&self.req)?.await?;
let elapsed = latency_timer.elapsed();
executor.ctx.observe_latency(elapsed);
let mut time = executor.ctx.backoff_async();
let mut res = Some(executor);
future::poll_fn(move || {
if let Some(ref mut t) = time {
try_ready!(t.poll());
}
time.take();
let executor = res.take().unwrap();
let l = if executor.ctx.keep_running() {
Loop::Continue(executor)
} else {
Loop::Break(())
};
Ok(Async::Ready(l))
})
})
});
self.ctx.observe_latency(elapsed);
let mut time = self.ctx.backoff_async();
if let Some(t) = &mut time {
t.await;
}
if !self.ctx.keep_running() {
break;
}
}
Ok(())
};
spawn!(client, keep_running, "unary async", f)
}

fn execute_stream_ping_pong(self) {
fn execute_stream_ping_pong(mut self) {
let client = self.client.clone();
let keep_running = self.ctx.keep_running.clone();
let (sender, receiver) = self.client.streaming_call().unwrap();
let f = future::loop_fn(
(sender, self, receiver),
move |(sender, mut executor, receiver)| {
let (mut sender, mut receiver) = self.client.streaming_call().unwrap();
let f = async move {
loop {
let latency_timer = Instant::now();
let send = sender.send((executor.req.clone(), WriteFlags::default()));
send.map_err(Error::from).and_then(move |sender| {
receiver
.into_future()
.map_err(|(e, _)| Error::from(e))
.and_then(move |(_, r)| {
executor.ctx.observe_latency(latency_timer.elapsed());
let mut time = executor.ctx.backoff_async();
let mut res = Some((sender, executor, r));
future::poll_fn(move || {
if let Some(ref mut t) = time {
try_ready!(t.poll());
}
time.take();
let r = res.take().unwrap();
let l = if r.1.ctx.keep_running() {
Loop::Continue(r)
} else {
Loop::Break(r)
};
Ok(Async::Ready(l))
})
})
})
},
)
.and_then(|(mut s, e, r)| {
future::poll_fn(move || s.close().map_err(Error::from)).map(|_| (e, r))
})
.and_then(|(e, r)| r.into_future().map(|_| e).map_err(|(e, _)| Error::from(e)));
sender
.send((self.req.clone(), WriteFlags::default()))
.await?;
receiver.try_next().await?;
self.ctx.observe_latency(latency_timer.elapsed());
let mut time = self.ctx.backoff_async();
if let Some(t) = &mut time {
t.await;
}
if !self.ctx.keep_running() {
break;
}
}
sender.close().await?;
receiver.try_next().await?;
Ok(())
};
spawn!(client, keep_running, "streaming ping pong", f);
}
}
Expand Down Expand Up @@ -390,26 +346,23 @@ impl Client {
his_param.get_resolution(),
his_param.get_max_possible(),
)));
let timer = Timer::default();
let keep_running = Arc::new(AtomicBool::new(true));
let mut running_reqs = Vec::with_capacity(client_channels * outstanding_rpcs_per_channel);

for ch in channels {
for _ in 0..cfg.get_outstanding_rpcs_per_channel() {
let his = his.clone();
let timer = timer.clone();
let ch = ch.clone();
let rx = if load_params.has_poisson() {
let lambda = load_params.get_poisson().get_offered_load()
/ client_channels as f64
/ outstanding_rpcs_per_channel as f64;
let poisson = Poisson::new(lambda);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), poisson, timer);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), poisson);
execute(ctx, ch, client_type, cfg);
rx
} else {
let (ctx, rx) =
ExecutorContext::new(his, keep_running.clone(), ClosedLoop, timer);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), ClosedLoop);
execute(ctx, ch, client_type, cfg);
rx
};
Expand Down Expand Up @@ -442,18 +395,9 @@ impl Client {
stats
}

pub fn shutdown(&mut self) -> BoxFuture<(), Error> {
pub fn shutdown(&mut self) -> impl Future<Output = ()> + Send {
self.keep_running.store(false, Ordering::Relaxed);
let mut tasks = self.running_reqs.take().unwrap();
let mut idx = tasks.len();
Box::new(future::poll_fn(move || {
while idx > 0 {
if let Ok(Async::NotReady) = tasks[idx - 1].poll() {
return Ok(Async::NotReady);
}
idx -= 1;
}
Ok(Async::Ready(()))
}))
let tasks = self.running_reqs.take().unwrap();
futures::future::join_all(tasks).map(|_| ())
}
}

0 comments on commit fd66ec7

Please sign in to comment.