Skip to content
Permalink
Browse files

fix build warnings

  • Loading branch information...
max-sixty committed Jul 9, 2019
1 parent 28430ee commit 28d47d72cc64441d27f3b7aceca13b8d551fdb27
@@ -55,15 +55,15 @@ pub mod rc {
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a strong statement about this.
sequestered: Rc<Box<Any>>,
sequestered: Rc<Box<dyn Any>>,
}

impl Bytes {

/// Create a new instance from a byte allocation.
pub fn from<B>(bytes: B) -> Bytes where B: DerefMut<Target=[u8]>+'static {

let mut boxed = Box::new(bytes) as Box<Any>;
let mut boxed = Box::new(bytes) as Box<dyn Any>;

let ptr = boxed.downcast_mut::<B>().unwrap().as_mut_ptr();
let len = boxed.downcast_ref::<B>().unwrap().len();
@@ -146,7 +146,7 @@ pub mod arc {
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a strong statement about this.
sequestered: Arc<Box<Any>>,
sequestered: Arc<Box<dyn Any>>,
}

unsafe impl Send for Bytes { }
@@ -156,7 +156,7 @@ pub mod arc {
/// Create a new instance from a byte allocation.
pub fn from<B>(bytes: B) -> Bytes where B : DerefMut<Target=[u8]>+'static {

let mut boxed = Box::new(bytes) as Box<Any>;
let mut boxed = Box::new(bytes) as Box<dyn Any>;

let ptr = boxed.downcast_mut::<B>().unwrap().as_mut_ptr();
let len = boxed.downcast_ref::<B>().unwrap().len();
@@ -48,7 +48,7 @@ impl Generic {
}
}
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
match self {
&mut Generic::Thread(ref mut t) => t.allocate(identifier),
&mut Generic::Process(ref mut p) => p.allocate(identifier),
@@ -87,7 +87,7 @@ impl Generic {
impl Allocate for Generic {
fn index(&self) -> usize { self.index() }
fn peers(&self) -> usize { self.peers() }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
self.allocate(identifier)
}

@@ -43,7 +43,7 @@ pub trait Allocate {
/// The number of workers in the communication group.
fn peers(&self) -> usize;
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>);
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
/// A shared queue of communication events with channel identifier.
///
/// It is expected that users of the channel allocator will regularly
@@ -19,7 +19,7 @@ pub struct ProcessBuilder {
index: usize,
peers: usize,
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap<usize, Box<Any+Send>>>>,
channels: Arc<Mutex<HashMap<usize, Box<dyn Any+Send>>>>,

// Buzzers for waking other local workers.
buzzers_send: Vec<Sender<Buzzer>>,
@@ -61,7 +61,7 @@ pub struct Process {
index: usize,
peers: usize,
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap</* channel id */ usize, Box<Any+Send>>>>,
channels: Arc<Mutex<HashMap</* channel id */ usize, Box<dyn Any+Send>>>>,
buzzers: Vec<Buzzer>,
counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
@@ -110,7 +110,7 @@ impl Process {
impl Allocate for Process {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Any+Send+Sync+'static>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
fn allocate<T: Any+Send+Sync+'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {

// this is race-y global initialisation of all channels for all workers, performed by the
// first worker that enters this critical section
@@ -166,10 +166,10 @@ impl Allocate for Process {
sends.into_iter()
.enumerate()
.map(|(i,s)| CountPusher::new(s, identifier, self.counters_send[i].clone()))
.map(|s| Box::new(s) as Box<Push<super::Message<T>>>)
.map(|s| Box::new(s) as Box<dyn Push<super::Message<T>>>)
.collect::<Vec<_>>();

let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box<Pull<super::Message<T>>>;
let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box<dyn Pull<super::Message<T>>>;

(sends, recv)
}
@@ -28,7 +28,7 @@ pub struct Thread {
impl Allocate for Thread {
fn index(&self) -> usize { 0 }
fn peers(&self) -> usize { 1 }
fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
(vec![Box::new(pusher)], Box::new(puller))
}
@@ -133,10 +133,10 @@ pub struct TcpAllocator<A: Allocate> {
impl<A: Allocate> Allocate for TcpAllocator<A> {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {

// Result list of boxed pushers.
let mut pushes = Vec::<Box<Push<Message<T>>>>::new();
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();

// Inner exchange allocations.
let inner_peers = self.inner.peers();
@@ -119,9 +119,9 @@ pub struct ProcessAllocator {
impl Allocate for ProcessAllocator {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {

let mut pushes = Vec::<Box<Push<Message<T>>>>::new();
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();

for target_index in 0 .. self.peers() {

@@ -38,7 +38,7 @@ pub fn initialize_networking(
my_index: usize,
threads: usize,
noisy: bool,
log_sender: Box<Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
let log_sender = Arc::new(log_sender);
@@ -103,15 +103,15 @@ impl<T:Data> Pull<Message<T>> for Puller<T> {
/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
/// allocation.
pub struct PullerInner<T> {
inner: Box<Pull<Message<T>>>, // inner pullable (e.g. intra-process typed queue)
inner: Box<dyn Pull<Message<T>>>, // inner pullable (e.g. intra-process typed queue)
_canary: Canary,
current: Option<Message<T>>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
}

impl<T:Data> PullerInner<T> {
/// Creates a new `PullerInner` instance from a shared queue.
pub fn new(inner: Box<Pull<Message<T>>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
pub fn new(inner: Box<dyn Pull<Message<T>>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
PullerInner {
inner,
_canary,
@@ -34,7 +34,7 @@ pub enum Configuration {
/// Verbosely report connection process
report: bool,
/// Closure to create a new logger for a communication thread
log_fn: Box<Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
log_fn: Box<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
}
}

@@ -104,7 +104,7 @@ impl Configuration {
}

/// Attempts to assemble the described communication infrastructure.
pub fn try_build(self) -> Result<(Vec<GenericBuilder>, Box<Any>), String> {
pub fn try_build(self) -> Result<(Vec<GenericBuilder>, Box<dyn Any>), String> {
match self {
Configuration::Thread => {
Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
@@ -250,7 +250,7 @@ pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
/// ```
pub fn initialize_from<A, T, F>(
builders: Vec<A>,
_others: Box<Any>,
_others: Box<dyn Any>,
func: F,
) -> Result<WorkerGuards<T>,String>
where
@@ -277,7 +277,7 @@ where
/// Maintains `JoinHandle`s for worker threads.
pub struct WorkerGuards<T:Send+'static> {
guards: Vec<::std::thread::JoinHandle<T>>,
_others: Box<Any>,
_others: Box<dyn Any>,
}

impl<T:Send+'static> WorkerGuards<T> {
@@ -9,7 +9,7 @@ pub struct Registry<Id> {
/// A worker-specific identifier.
id: Id,
/// A map from names to typed loggers.
map: HashMap<String, (Box<Any>, Box<Flush>)>,
map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
/// An instant common to all logging statements.
time: Instant,
}
@@ -29,7 +29,7 @@ impl<Id: Clone+'static> Registry<Id> {
pub fn insert<T: 'static, F: FnMut(&Duration, &mut Vec<(Duration, Id, T)>)+'static>(
&mut self,
name: &str,
action: F) -> Option<Box<Any>>
action: F) -> Option<Box<dyn Any>>
{
let logger = Logger::<T, Id>::new(self.time.clone(), self.id.clone(), action);
self.insert_logger(name, logger)
@@ -39,7 +39,7 @@ impl<Id: Clone+'static> Registry<Id> {
pub fn insert_logger<T: 'static>(
&mut self,
name: &str,
logger: Logger<T, Id>) -> Option<Box<Any>>
logger: Logger<T, Id>) -> Option<Box<dyn Any>>
{
self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
}
@@ -50,7 +50,7 @@ impl<Id: Clone+'static> Registry<Id> {
/// communicate that the stream is closed to any consumers. If a binding is not removed,
/// then the stream cannot be complete as in principle anyone could acquire a handle to
/// the logger and start further logging.
pub fn remove(&mut self, name: &str) -> Option<Box<Any>> {
pub fn remove(&mut self, name: &str) -> Option<Box<dyn Any>> {
self.map.remove(name).map(|x| x.0)
}

@@ -89,7 +89,7 @@ impl<Id> Flush for Registry<Id> {
pub struct Logger<T, E> {
id: E,
time: Instant, // common instant used for all loggers.
action: Rc<RefCell<FnMut(&Duration, &mut Vec<(Duration, E, T)>)>>, // action to take on full log buffers.
action: Rc<RefCell<dyn FnMut(&Duration, &mut Vec<(Duration, E, T)>)>>, // action to take on full log buffers.
buffer: Rc<RefCell<Vec<(Duration, E, T)>>>, // shared buffer; not obviously best design.
}

@@ -58,8 +58,8 @@ impl<D, F: FnMut(&D)->u64> Exchange<D, F> {
impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
// TODO: The closure in the type prevents us from naming it.
// Could specialize `ExchangePusher` to a time-free version.
type Pusher = Box<Push<Bundle<T, D>>>;
type Puller = Box<Pull<Bundle<T, D>>>;
type Pusher = Box<dyn Push<Bundle<T, D>>>;
type Puller = Box<dyn Pull<Bundle<T, D>>>;
fn connect<A: AsWorker>(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, D>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
@@ -11,7 +11,7 @@ use crate::communication::Push;
/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct Tee<T: 'static, D: 'static> {
buffer: Vec<D>,
shared: Rc<RefCell<Vec<Box<Push<Bundle<T, D>>>>>>,
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>,
}

impl<T: Data, D: Data> Push<Bundle<T, D>> for Tee<T, D> {
@@ -60,7 +60,7 @@ impl<T, D> Clone for Tee<T, D> {

/// A shared list of `Box<Push>` used to add `Push` implementors.
pub struct TeeHelper<T, D> {
shared: Rc<RefCell<Vec<Box<Push<Bundle<T, D>>>>>>
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>
}

impl<T, D> TeeHelper<T, D> {
@@ -54,7 +54,7 @@ where
{
fn index(&self) -> usize { self.parent.index() }
fn peers(&self) -> usize { self.parent.peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<Push<Message<D>>>>, Box<Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
self.parent.allocate(identifier, address)
}
fn pipeline<D: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<D>>, ThreadPuller<Message<D>>) {
@@ -97,7 +97,7 @@ where
self.subgraph.borrow_mut().connect(source, target);
}

fn add_operator_with_indices(&mut self, operator: Box<Operate<Self::Timestamp>>, local: usize, global: usize) {
fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize) {
self.subgraph.borrow_mut().add_child(operator, local, global);
}

@@ -38,7 +38,7 @@ pub trait Scope: ScopeParent {
fn add_edge(&self, source: Source, target: Target);

/// Adds a child `Operate` to the builder's scope. Returns the new child's index.
fn add_operator(&mut self, operator: Box<Operate<Self::Timestamp>>) -> usize {
fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
let index = self.allocate_operator_index();
let global = self.new_identifier();
self.add_operator_with_indices(operator, index, global);
@@ -56,15 +56,15 @@ pub trait Scope: ScopeParent {
///
/// This is used internally when there is a gap between allocate a child identifier and adding the
/// child, as happens in subgraph creation.
fn add_operator_with_index(&mut self, operator: Box<Operate<Self::Timestamp>>, index: usize) {
fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
let global = self.new_identifier();
self.add_operator_with_indices(operator, index, global);
}

/// Adds a child `Operate` to the builder's scope using supplied indices.
///
/// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
fn add_operator_with_indices(&mut self, operator: Box<Operate<Self::Timestamp>>, local: usize, global: usize);
fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);

/// Creates a dataflow subgraph.
///
@@ -276,7 +276,7 @@ pub fn execute_from_args<I, T, F>(iter: I, func: F) -> Result<WorkerGuards<T>,St
/// })
/// }).unwrap();
/// ```
pub fn execute_from<A, T, F>(builders: Vec<A>, others: Box<::std::any::Any>, func: F) -> Result<WorkerGuards<T>,String>
pub fn execute_from<A, T, F>(builders: Vec<A>, others: Box<dyn (::std::any::Any)>, func: F) -> Result<WorkerGuards<T>,String>
where
A: AllocateBuilder+'static,
T: Send+'static,
@@ -14,8 +14,8 @@ pub type ProgressMsg<T> = Message<(usize, usize, ProgressVec<T>)>;
/// Manages broadcasting of progress updates to and receiving updates from workers.
pub struct Progcaster<T:Timestamp> {
to_push: Option<ProgressMsg<T>>,
pushers: Vec<Box<Push<ProgressMsg<T>>>>,
puller: Box<Pull<ProgressMsg<T>>>,
pushers: Vec<Box<dyn Push<ProgressMsg<T>>>>,
puller: Box<dyn Pull<ProgressMsg<T>>>,
/// Source worker index
source: usize,
/// Sequence number counter
@@ -122,7 +122,7 @@ where
}

/// Adds a new child to the subgraph.
pub fn add_child(&mut self, child: Box<Operate<TInner>>, index: usize, identifier: usize) {
pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
{
let mut child_path = self.path.clone();
child_path.push(index);
@@ -557,7 +557,7 @@ struct PerOperatorState<T: Timestamp> {
inputs: usize, // number of inputs to the operator
outputs: usize, // number of outputs from the operator

operator: Option<Box<Operate<T>>>,
operator: Option<Box<dyn Operate<T>>>,

edges: Vec<Vec<Target>>, // edges from the outputs of the operator

@@ -591,7 +591,7 @@ impl<T: Timestamp> PerOperatorState<T> {
}

pub fn new(
mut scope: Box<Operate<T>>,
mut scope: Box<dyn Operate<T>>,
index: usize,
mut _path: Vec<usize>,
identifier: usize,
@@ -35,7 +35,7 @@ pub trait AsWorker : Scheduler {
/// scheduled in response to the receipt of records on the channel.
/// Most commonly, this would be the address of the *target* of the
/// channel.
fn allocate<T: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>);
fn allocate<T: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
/// Constructs a pipeline channel from the worker to itself.
///
/// By default this method uses the native channel allocation mechanism, but the expectation is
@@ -73,7 +73,7 @@ pub struct Worker<A: Allocate> {
impl<A: Allocate> AsWorker for Worker<A> {
fn index(&self) -> usize { self.allocator.borrow().index() }
fn peers(&self) -> usize { self.allocator.borrow().peers() }
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<Push<Message<D>>>>, Box<Pull<Message<D>>>) {
fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
if address.len() == 0 { panic!("Unacceptable address: Length zero"); }
let mut paths = self.paths.borrow_mut();
paths.insert(identifier, address.to_vec());
@@ -468,8 +468,8 @@ impl<A: Allocate> Clone for Worker<A> {
struct Wrapper {
logging: Option<TimelyLogger>,
identifier: usize,
operate: Option<Box<Schedule>>,
resources: Option<Box<Any>>,
operate: Option<Box<dyn Schedule>>,
resources: Option<Box<dyn Any>>,
channel_ids: Vec<usize>,
}

0 comments on commit 28d47d7

Please sign in to comment.
You can’t perform that action at this time.