Skip to content

Commit

Permalink
communication: upgrade to Rust 2018
Browse files Browse the repository at this point in the history
Also fix a bunch of tests that had rotted.
  • Loading branch information
benesch committed Apr 4, 2019
1 parent c998c26 commit 3a740a2
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 56 deletions.
3 changes: 2 additions & 1 deletion communication/Cargo.toml
Expand Up @@ -3,6 +3,7 @@ name = "timely_communication"
version = "0.9.0"
authors = ["Frank McSherry <fmcsherry@me.com>"]
description = "Communication layer for timely dataflow"
edition = "2018"

# These URLs point to more information about the repository
documentation = "https://frankmcsherry.github.com/timely-dataflow"
Expand All @@ -20,7 +21,7 @@ bincode = { version = "1.0", optional = true }
serde_derive = "1.0"
serde = "1.0"
abomonation = "0.7"
abomonation_derive = "0.3"
abomonation_derive = "0.4"
timely_bytes = { path = "../bytes", version = "0.9" }
timely_logging = { path = "../logging", version = "0.9" }

Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/counters.rs
Expand Up @@ -4,8 +4,8 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use {Push, Pull};
use allocator::Event;
use crate::{Push, Pull};
use crate::allocator::Event;

/// The push half of an intra-thread channel.
pub struct Pusher<T, P: Push<T>> {
Expand Down
12 changes: 6 additions & 6 deletions communication/src/allocator/generic.rs
Expand Up @@ -7,13 +7,13 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use allocator::thread::ThreadBuilder;
use allocator::process::ProcessBuilder as TypedProcessBuilder;
use allocator::{Allocate, AllocateBuilder, Event, Thread, Process};
use allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};
use crate::allocator::thread::ThreadBuilder;
use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process};
use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};

use {Push, Pull, Data, Message};
use crate::{Push, Pull, Data, Message};

/// Enumerates known implementors of `Allocate`.
/// Passes trait method calls on to members.
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/mod.rs
Expand Up @@ -17,7 +17,7 @@ pub mod counters;

pub mod zero_copy;

use {Data, Push, Pull, Message};
use crate::{Data, Push, Pull, Message};

/// A proto-allocator, which implements `Send` and can be completed with `build`.
///
Expand Down
12 changes: 6 additions & 6 deletions communication/src/allocator/process.rs
Expand Up @@ -7,9 +7,9 @@ use std::any::Any;
use std::sync::mpsc::{Sender, Receiver, channel};
use std::collections::{HashMap, VecDeque};

use allocator::thread::{ThreadBuilder};
use allocator::{Allocate, AllocateBuilder, Event, Thread};
use {Push, Pull, Message};
use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
use crate::{Push, Pull, Message};

/// An allocator for inter-thread, intra-process communication
pub struct ProcessBuilder {
Expand Down Expand Up @@ -95,7 +95,7 @@ impl Allocate for Process {
let (sends, recv, empty) = {

// we may need to alloc a new channel ...
let mut entry = channels.entry(identifier).or_insert_with(|| {
let entry = channels.entry(identifier).or_insert_with(|| {

let mut pushers = Vec::new();
let mut pullers = Vec::new();
Expand Down Expand Up @@ -133,8 +133,8 @@ impl Allocate for Process {

if empty { channels.remove(&identifier); }

use allocator::counters::ArcPusher as CountPusher;
use allocator::counters::Puller as CountPuller;
use crate::allocator::counters::ArcPusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;

let sends =
sends.into_iter()
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/thread.rs
Expand Up @@ -4,10 +4,10 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use allocator::{Allocate, AllocateBuilder, Event};
use allocator::counters::Pusher as CountPusher;
use allocator::counters::Puller as CountPuller;
use {Push, Pull, Message};
use crate::allocator::{Allocate, AllocateBuilder, Event};
use crate::allocator::counters::Pusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;
use crate::{Push, Pull, Message};

/// Builder for single-threaded allocator.
pub struct ThreadBuilder;
Expand Down
14 changes: 7 additions & 7 deletions communication/src/allocator/zero_copy/allocator.rs
Expand Up @@ -6,13 +6,13 @@ use std::collections::{VecDeque, HashMap};

use bytes::arc::Bytes;

use networking::MessageHeader;
use crate::networking::MessageHeader;

use {Allocate, Message, Data, Push, Pull};
use allocator::AllocateBuilder;
use allocator::{Event, Process};
use allocator::process::ProcessBuilder;
use allocator::canary::Canary;
use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::allocator::{Event, Process};
use crate::allocator::process::ProcessBuilder;
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue, Signal};
use super::push_pull::{Pusher, PullerInner};
Expand Down Expand Up @@ -181,7 +181,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
.clone();

use allocator::counters::Puller as CountPuller;
use crate::allocator::counters::Puller as CountPuller;
let canary = Canary::new(identifier, self.canaries.clone());
let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone()));

Expand Down
12 changes: 6 additions & 6 deletions communication/src/allocator/zero_copy/allocator_process.rs
Expand Up @@ -6,11 +6,11 @@ use std::collections::{VecDeque, HashMap};

use bytes::arc::Bytes;

use networking::MessageHeader;
use crate::networking::MessageHeader;

use {Allocate, Message, Data, Push, Pull};
use allocator::{AllocateBuilder, Event};
use allocator::canary::Canary;
use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::{AllocateBuilder, Event};
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue, Signal};

Expand Down Expand Up @@ -146,7 +146,7 @@ impl Allocate for ProcessAllocator {
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
.clone();

use allocator::counters::Puller as CountPuller;
use crate::allocator::counters::Puller as CountPuller;
let canary = Canary::new(identifier, self.canaries.clone());
let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, self.events().clone()));

Expand All @@ -166,7 +166,7 @@ impl Allocate for ProcessAllocator {
.expect("non-existent channel dropped");
assert!(dropped.borrow().is_empty());
}
::std::mem::drop(canaries);
std::mem::drop(canaries);

let mut events = self.events.borrow_mut();

Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/zero_copy/initialize.rs
@@ -1,9 +1,9 @@
//! Network initialization.

use std::sync::Arc;
// use allocator::Process;
use allocator::process::ProcessBuilder;
use networking::create_sockets;
// use crate::allocator::Process;
use crate::allocator::process::ProcessBuilder;
use crate::networking::create_sockets;
use super::tcp::{send_loop, recv_loop};
use super::allocator::{TcpBuilder, new_vector};

Expand All @@ -29,7 +29,7 @@ impl Drop for CommsGuard {
}
}

use ::logging::{CommunicationSetup, CommunicationEvent};
use crate::logging::{CommunicationSetup, CommunicationEvent};
use logging_core::Logger;

/// Initializes network connections
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/zero_copy/push_pull.rs
Expand Up @@ -6,11 +6,11 @@ use std::collections::VecDeque;

use bytes::arc::Bytes;

use allocator::canary::Canary;
use networking::MessageHeader;
use crate::allocator::canary::Canary;
use crate::networking::MessageHeader;

use {Data, Push, Pull};
use allocator::Message;
use crate::{Data, Push, Pull};
use crate::allocator::Message;

use super::bytes_exchange::{BytesPush, SendEndpoint};

Expand Down Expand Up @@ -51,7 +51,7 @@ impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
{
let mut bytes = borrow.reserve(header.required_bytes());
assert!(bytes.len() >= header.required_bytes());
let mut writer = &mut bytes;
let writer = &mut bytes;
header.write_to(writer).expect("failed to write header!");
element.into_bytes(writer);
}
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/zero_copy/tcp.rs
Expand Up @@ -3,14 +3,14 @@
use std::io::{Read, Write};
use std::net::TcpStream;

use networking::MessageHeader;
use crate::networking::MessageHeader;

use super::bytes_slab::BytesSlab;
use super::bytes_exchange::{MergeQueue, Signal};

use logging_core::Logger;

use ::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};
use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};

/// Repeatedly reads from a TcpStream and carves out messages.
///
Expand Down Expand Up @@ -95,7 +95,7 @@ pub fn recv_loop(
// Pass bytes along to targets.
for (index, staged) in stageds.iter_mut().enumerate() {
// FIXME: try to merge `staged` before handing it to BytesPush::extend
use allocator::zero_copy::bytes_exchange::BytesPush;
use crate::allocator::zero_copy::bytes_exchange::BytesPush;
targets[index].extend(staged.drain(..));
}
}
Expand Down Expand Up @@ -128,7 +128,7 @@ pub fn send_loop(

// TODO: Round-robin better, to release resources fairly when overloaded.
for source in sources.iter_mut() {
use allocator::zero_copy::bytes_exchange::BytesPull;
use crate::allocator::zero_copy::bytes_exchange::BytesPull;
source.drain_into(&mut stash);
}

Expand Down
14 changes: 7 additions & 7 deletions communication/src/initialize.rs
Expand Up @@ -9,11 +9,11 @@ use std::sync::Arc;

use std::any::Any;

use allocator::thread::ThreadBuilder;
use allocator::{AllocateBuilder, Process, Generic, GenericBuilder};
use allocator::zero_copy::initialize::initialize_networking;
use crate::allocator::thread::ThreadBuilder;
use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder};
use crate::allocator::zero_copy::initialize::initialize_networking;

use ::logging::{CommunicationSetup, CommunicationEvent};
use crate::logging::{CommunicationSetup, CommunicationEvent};
use logging_core::Logger;


Expand Down Expand Up @@ -186,7 +186,7 @@ pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
config: Configuration,
func: F,
) -> Result<WorkerGuards<T>,String> {
let (allocators, others) = try!(config.try_build());
let (allocators, others) = config.try_build()?;
initialize_from(allocators, others, func)
}

Expand Down Expand Up @@ -255,13 +255,13 @@ where
let mut guards = Vec::new();
for (index, builder) in builders.into_iter().enumerate() {
let clone = logic.clone();
guards.push(try!(thread::Builder::new()
guards.push(thread::Builder::new()
.name(format!("worker thread {}", index))
.spawn(move || {
let communicator = builder.build();
(*clone)(communicator)
})
.map_err(|e| format!("{:?}", e))));
.map_err(|e| format!("{:?}", e))?);
}

Ok(WorkerGuards { guards, _others })
Expand Down
2 changes: 1 addition & 1 deletion communication/src/logging.rs
Expand Up @@ -26,7 +26,7 @@ pub struct MessageEvent {
/// true for send event, false for receive event
pub is_send: bool,
/// associated message header.
pub header: ::networking::MessageHeader,
pub header: crate::networking::MessageHeader,
}

/// Starting or stopping communication threads.
Expand Down
2 changes: 1 addition & 1 deletion communication/src/message.rs
Expand Up @@ -3,7 +3,7 @@
use std::sync::Arc;
use bytes::arc::Bytes;
use abomonation;
use ::Data;
use crate::Data;

/// Either an immutable or mutable reference.
pub enum RefOrMut<'a, T> where T: 'a {
Expand Down
2 changes: 1 addition & 1 deletion communication/src/networking.rs
Expand Up @@ -108,7 +108,7 @@ pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bo
/// Result contains connections [my_index + 1, addresses.len() - 1].
pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
let listener = try!(TcpListener::bind(&addresses[my_index][..]));
let listener = TcpListener::bind(&addresses[my_index][..])?;

for _ in (my_index + 1) .. addresses.len() {
let mut stream = listener.accept()?.0;
Expand Down

0 comments on commit 3a740a2

Please sign in to comment.