Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update serde_closure requirement from 0.1 to 0.2 #36

Merged
merged 3 commits into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ pin-utils = "0.1.0-alpha.4"
rand = "0.7"
relative = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.1"
serde_closure = "0.2"
serde_traitobject = "0.2"
serde_json = "1.0"
serde_pipe = "0.1"
tcp_typed = "0.1"
Expand All @@ -68,7 +69,6 @@ hex = "0.4"
itertools = "0.8"
multiset = "0.0"
regex = "1.0"
serde_traitobject = "0.1.5"
sha1 = "0.6"
systemstat = "0.1"

Expand Down
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
endpoint: alecmocatta
default:
rust_toolchain: nightly
rust_lint_toolchain: nightly-2019-10-04
rust_lint_toolchain: nightly-2019-10-17
rust_flags: ''
rust_features: 'no_alloc;no_alloc distribute_binaries'
rust_target_check: ''
Expand Down
4 changes: 3 additions & 1 deletion constellation-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
clippy::needless_pass_by_value,
clippy::large_enum_variant,
clippy::if_not_else,
clippy::inline_always
clippy::inline_always,
clippy::must_use_candidate,
clippy::double_must_use
)]

mod ext;
Expand Down
9 changes: 5 additions & 4 deletions examples/all_to_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ fn main() {
mem: 20 * 1024 * 1024,
..Resources::default()
},
serde_closure::FnOnce!([process_index] move |parent| {
FnOnce!(move |parent| {
let receiver = Receiver::<Vec<Pid>>::new(parent);
let pids = receiver.recv().block().unwrap();
assert_eq!(pids[process_index], pid());
let mut senders: Vec<Option<Sender<usize>>> = Vec::with_capacity(pids.len());
let mut receivers: Vec<Option<Receiver<usize>>> = Vec::with_capacity(pids.len());
let mut receivers: Vec<Option<Receiver<usize>>> =
Vec::with_capacity(pids.len());
for i in 0..pids.len() {
for j in 0..pids.len() {
if i == process_index {
Expand All @@ -76,8 +77,8 @@ fn main() {
}
}
}
for (i,receiver) in receivers.iter().enumerate() {
for (j,sender) in senders.iter().enumerate() {
for (i, receiver) in receivers.iter().enumerate() {
for (j, sender) in senders.iter().enumerate() {
if i == j {
continue;
}
Expand Down
8 changes: 4 additions & 4 deletions examples/fibonacci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn fib_processes(x: usize) -> usize {
}
let left_pid = spawn(
Resources::default(),
FnOnce!([x] move |parent_pid| {
FnOnce!(move |parent_pid| {
Sender::<usize>::new(parent_pid)
.send(fib_processes(x - 1))
.block()
Expand All @@ -31,7 +31,7 @@ fn fib_processes(x: usize) -> usize {

let right_pid = spawn(
Resources::default(),
FnOnce!([x] move |parent_pid| {
FnOnce!(move |parent_pid| {
Sender::<usize>::new(parent_pid)
.send(fib_processes(x - 2))
.block()
Expand All @@ -52,7 +52,7 @@ fn fib_processes_async(x: usize) -> Result<usize, Box<dyn st::Error>> {
let left = async {
let pid = spawn(
Resources::default(),
FnOnce!([x] move |parent| {
FnOnce!(move |parent| {
Sender::<Msg>::new(parent)
.send(fib_processes_async(x - 1))
.block()
Expand All @@ -66,7 +66,7 @@ fn fib_processes_async(x: usize) -> Result<usize, Box<dyn st::Error>> {
let right = async {
let pid = spawn(
Resources::default(),
FnOnce!([x] move |parent| {
FnOnce!(move |parent| {
Sender::<Msg>::new(parent)
.send(fib_processes_async(x - 2))
.block()
Expand Down
22 changes: 15 additions & 7 deletions examples/fork_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,34 +51,42 @@ fn main() {
Resources::default(),
// Make this closure serializable by wrapping with serde_closure's
// FnOnce!() macro, which requires explicitly listing captured variables.
serde_closure::FnOnce!([i] move |parent| {
FnOnce!(move |parent| {
println!("process {}: commencing hashing", i);

let mut rng = rand::thread_rng();

// To record the lowest hash value seen
let mut lowest: Option<(String,[u8;20])> = None;
let mut lowest: Option<(String, [u8; 20])> = None;

// Loop for ten seconds
let start = time::Instant::now();
while start.elapsed() < time::Duration::new(10,0) {
while start.elapsed() < time::Duration::new(10, 0) {
// Generate a random 7 character string
let string: String = iter::repeat(()).map(|()| rng.sample(Alphanumeric)).take(7).collect();
let string: String = iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.take(7)
.collect();

// Hash the string
let hash = Sha1::from(&string).digest().bytes();

// Update our record of the lowest hash value seen
if lowest.is_none() || lowest.as_ref().unwrap().1 >= hash {
lowest = Some((string,hash));
lowest = Some((string, hash));
}
}

let lowest = lowest.unwrap();
println!("process {}: lowest hash was {} from string \"{}\"", i, hex::encode(lowest.1), lowest.0);
println!(
"process {}: lowest hash was {} from string \"{}\"",
i,
hex::encode(lowest.1),
lowest.0
);

// Create a `Sender` half of a channel to our parent
let sender = Sender::<(String,[u8;20])>::new(parent);
let sender = Sender::<(String, [u8; 20])>::new(parent);

// Send our record along the channel to our parent
sender.send(lowest).block();
Expand Down
11 changes: 7 additions & 4 deletions examples/process_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl ProcessPool {
resources,
// Make this closure serializable by wrapping with serde_closure's
// FnOnce!() macro, which requires explicitly listing captured variables.
serde_closure::FnOnce!([] move |parent| {
FnOnce!(move |parent| {
// println!("process {}: awaiting work", i);

// Create a `Sender` half of a channel to our parent
Expand Down Expand Up @@ -136,7 +136,7 @@ impl ProcessPool {
let process = &mut self.processes[process_index];
process
.sender
.send(Some(st::Box::new(serde_closure::FnOnce!([work] move || {
.send(Some(st::Box::new(FnOnce!(move || {
let work: F = work;
st::Box::new(work()) as Response
})) as Request))
Expand Down Expand Up @@ -188,8 +188,11 @@ fn main() {

let handles = (0..processes * 3)
.map(|i| {
pool.spawn(serde_closure::FnOnce!([i] move || -> String {
thread::sleep(rand::thread_rng().gen_range(time::Duration::new(0,0),time::Duration::new(5,0)));
pool.spawn(FnOnce!(move || -> String {
thread::sleep(
rand::thread_rng()
.gen_range(time::Duration::new(0, 0), time::Duration::new(5, 0)),
);
format!("warm greetings from job {}", i)
}))
})
Expand Down
3 changes: 1 addition & 2 deletions src/bin/constellation/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ list of all the nodes:
For more information, see the documentation at
https://github.com/alecmocatta/constellation/ or run again with --help -v
"#;
const VERBOSE_HELP: &str =
r#"A constellation cluster consists of one or more nodes. A node is started like
const VERBOSE_HELP: &str = r#"A constellation cluster consists of one or more nodes. A node is started like
this:

constellation 10.0.0.2:9999
Expand Down
4 changes: 2 additions & 2 deletions src/bin/constellation/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ fn monitor_process(
}

fn recce(
#[cfg(feature = "distribute_binaries")] /* https://github.com/rust-lang/rustfmt/issues/3623 */
binary: &File, args: &[OsString], vars: &[(OsString, OsString)],
#[cfg(feature = "distribute_binaries")] binary: &File, args: &[OsString],
vars: &[(OsString, OsString)],
) -> Result<Resources, ()> {
let (reader, writer) = nix::unistd::pipe().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/bin/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn main() {
}
let bridge_address: net::SocketAddr = args.arg_host.parse().unwrap();
let path = args.arg_binary;
let args: Vec<ffi::OsString> = iter::once(ffi::OsString::from(path.clone()))
let args: Vec<ffi::OsString> = iter::once(ffi::OsString::from(&path))
.chain(args.arg_args.into_iter().map(ffi::OsString::from))
.collect();
let vars: Vec<(ffi::OsString, ffi::OsString)> = env::vars_os().collect();
Expand Down
22 changes: 13 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
clippy::if_not_else,
clippy::module_name_repetitions,
clippy::new_ret_no_self,
clippy::type_complexity
clippy::type_complexity,
clippy::must_use_candidate
)]

#[cfg(doctest)]
Expand Down Expand Up @@ -67,6 +68,8 @@ use constellation_internal::{
pub use channel::ChannelError;
#[doc(inline)]
pub use constellation_internal::{Pid, Resources, SpawnError, TrySpawnError, RESOURCES_DEFAULT};
#[doc(inline)]
pub use serde_closure::{Fn, FnMut, FnOnce};

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -177,7 +180,7 @@ impl<T: Serialize> Sender<T> {

/// Nonblocking send.
///
/// If sending would not block, `Some` is returned with a FnOnce that accepts a `T` to send.
/// If sending would not block, `Some` is returned with a `FnOnce` that accepts a `T` to send.
/// If sending would block, `None` is returned.
pub fn try_send<'a>(&'a self) -> Option<impl FnOnce(T) + 'a>
where
Expand Down Expand Up @@ -349,7 +352,7 @@ impl<T: DeserializeOwned> Receiver<T> {

/// Nonblocking recv.
///
/// If receiving would not block, `Some` is returned with a FnOnce that returns a `Result<T, ChannelError>`.
/// If receiving would not block, `Some` is returned with a `FnOnce` that returns a `Result<T, ChannelError>`.
/// If receiving would block, `None` is returned.
pub fn try_recv<'a>(&'a self) -> Option<impl FnOnce() -> Result<T, ChannelError> + 'a>
where
Expand Down Expand Up @@ -493,7 +496,7 @@ pub fn resources() -> Resources {

#[allow(clippy::too_many_lines)]
fn spawn_native(
resources: Resources, f: serde_closure::FnOnce<(Vec<u8>,), fn((Vec<u8>,), (Pid,))>,
resources: Resources, f: &(dyn serde_traitobject::FnOnce<(Pid,), Output = ()> + 'static),
_block: bool,
) -> Result<Pid, TrySpawnError> {
trace!("spawn_native");
Expand Down Expand Up @@ -646,7 +649,8 @@ fn spawn_native(
}

fn spawn_deployed(
resources: Resources, f: serde_closure::FnOnce<(Vec<u8>,), fn((Vec<u8>,), (Pid,))>, block: bool,
resources: Resources, f: &(dyn serde_traitobject::FnOnce<(Pid,), Output = ()> + 'static),
block: bool,
) -> Result<Pid, TrySpawnError> {
trace!("spawn_deployed");
let stream = unsafe { TcpStream::from_raw_fd(SCHEDULER_FD) };
Expand Down Expand Up @@ -709,15 +713,15 @@ async fn spawn_inner<T: FnOnce(Pid) + Serialize + DeserializeOwned>(
});
let arg: Vec<u8> = bincode::serialize(&start).unwrap();

let start: serde_closure::FnOnce<(Vec<u8>,), fn((Vec<u8>,), (Pid,))> = serde_closure::FnOnce!([arg]move|parent|{
let start = FnOnce!(move |parent| {
let arg: Vec<u8> = arg;
let closure: T = bincode::deserialize(&arg).unwrap();
closure(parent)
});
if !deployed {
spawn_native(resources, start, block)
spawn_native(resources, &start, block)
} else {
spawn_deployed(resources, start, block)
spawn_deployed(resources, &start, block)
}
}

Expand Down Expand Up @@ -1435,7 +1439,7 @@ pub fn init(resources: Resources) {
let parent: Pid = bincode::deserialize_from(&mut argument)
.map_err(map_bincode_err)
.unwrap();
let start: serde_closure::FnOnce<(Vec<u8>,), fn((Vec<u8>,), (Pid,))> =
let start: Box<dyn serde_traitobject::FnOnce<(Pid,), Output = ()>> =
bincode::deserialize_from(&mut argument)
.map_err(map_bincode_err)
.unwrap();
Expand Down
22 changes: 10 additions & 12 deletions tests/message-alltoall-sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ fn sub2<
mem: 20 * 1024 * 1024,
..Resources::default()
},
FnOnce!([arg] move |parent| {
sub2(parent, arg)
}),
FnOnce!(move |parent| sub2(parent, arg)),
)
.block()
.expect("spawn() failed to allocate process");
Expand Down Expand Up @@ -185,7 +183,7 @@ fn main() {
mem: 20 * 1024 * 1024,
..Resources::default()
},
FnOnce!([arg] move |parent| {
FnOnce!(move |parent| {
println!("! sub1");
let receiver = Receiver::new(parent);
let hi: String = receiver.recv().block().unwrap();
Expand Down Expand Up @@ -219,9 +217,7 @@ fn main() {
mem: 20 * 1024 * 1024,
..Resources::default()
},
FnOnce!([arg] move |parent| {
sub2(parent, arg)
}),
FnOnce!(move |parent| sub2(parent, arg)),
)
.block()
.expect("spawn() failed to allocate process");
Expand All @@ -237,14 +233,16 @@ fn main() {
mem: 20 * 1024 * 1024,
..Resources::default()
},
FnOnce!([arg] move |parent| {
FnOnce!(move |parent| {
println!("! sub3");
let receiver = Receiver::<Vec<Pid>>::new(parent);
let pids = receiver.recv().block().unwrap();
// println!("{:?}", pids);
assert_eq!(pids[arg], pid());
let mut senders: Vec<Option<Sender<usize>>> = Vec::with_capacity(pids.len());
let mut receivers: Vec<Option<Receiver<usize>>> = Vec::with_capacity(pids.len());
let mut senders: Vec<Option<Sender<usize>>> =
Vec::with_capacity(pids.len());
let mut receivers: Vec<Option<Receiver<usize>>> =
Vec::with_capacity(pids.len());
for i in 0..pids.len() {
for j in 0..pids.len() {
if i == arg {
Expand All @@ -266,8 +264,8 @@ fn main() {
assert_eq!(senders.len(), pids.len());
assert_eq!(receivers.len(), pids.len());
println!("done");
for (i,receiver) in receivers.iter().enumerate() {
for (j,sender) in senders.iter().enumerate() {
for (i, receiver) in receivers.iter().enumerate() {
for (j, sender) in senders.iter().enumerate() {
if i == j {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn main() {
mem: 20 * 1024 * 1024 + i,
cpu: 65536 / 1000,
},
FnOnce!([i] move |_parent| {
FnOnce!(move |_parent| {
assert_eq!(resources().mem, 20 * 1024 * 1024 + i);
println!("hi {:?}", resources());
}),
Expand Down
7 changes: 4 additions & 3 deletions tests/spawn-multiple-futures-send-recv-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,16 @@ fn main() {
mem: 20 * 1024 * 1024,
..Resources::default()
},
FnOnce!([i] move |parent| {
FnOnce!(move |parent| {
println!("hi {}", i);
let (receiver, sender) = (
Receiver::<Option<String>>::new(parent),
Sender::<Option<String>>::new(parent),
);
futures::executor::block_on(
receiver.forward(sender.sink_map_err(|_|unreachable!()))
).unwrap();
receiver.forward(sender.sink_map_err(|_| unreachable!())),
)
.unwrap();
println!("done {}", i);
}),
)
Expand Down
2 changes: 1 addition & 1 deletion tests/spawn-multiple-send-recv-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn main() {
mem: 20 * 1024 * 1024,
..Resources::default()
},
FnOnce!([i] move |parent| {
FnOnce!(move |parent| {
println!("hi {}", i);
let receiver = Receiver::<Option<String>>::new(parent);
let sender = Sender::<Option<String>>::new(parent);
Expand Down
Loading