diff --git a/src/communication/control_message_handler.rs b/src/communication/control_message_handler.rs index 67289633..4c579ef3 100644 --- a/src/communication/control_message_handler.rs +++ b/src/communication/control_message_handler.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use slog::Logger; +use slog::{self, Logger}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use crate::node::NodeId; @@ -22,6 +22,7 @@ pub struct ControlMessageHandler { channels_to_nodes: HashMap>, } +#[allow(dead_code)] impl ControlMessageHandler { pub fn new(logger: Logger) -> Self { let (tx, rx) = mpsc::unbounded_channel(); @@ -43,9 +44,10 @@ impl ControlMessageHandler { tx: UnboundedSender, ) { if let Some(_) = self.channels_to_control_senders.insert(node_id, tx) { - error!( + slog::error!( self.logger, - "ControlMessageHandler: overwrote channel to control sender for node {}", node_id + "ControlMessageHandler: overwrote channel to control sender for node {}", + node_id ); } } @@ -77,9 +79,10 @@ impl ControlMessageHandler { tx: UnboundedSender, ) { if let Some(_) = self.channels_to_control_receivers.insert(node_id, tx) { - error!( + slog::error!( self.logger, - "ControlMessageHandler: overwrote channel to control receiver for node {}", node_id + "ControlMessageHandler: overwrote channel to control receiver for node {}", + node_id ); } } @@ -111,9 +114,10 @@ impl ControlMessageHandler { tx: UnboundedSender, ) { if let Some(_) = self.channels_to_data_senders.insert(node_id, tx) { - error!( + slog::error!( self.logger, - "ControlMessageHandler: overwrote channel to data sender for node {}", node_id + "ControlMessageHandler: overwrote channel to data sender for node {}", + node_id ); } } @@ -145,9 +149,10 @@ impl ControlMessageHandler { tx: UnboundedSender, ) { if let Some(_) = self.channels_to_data_receivers.insert(node_id, tx) { - error!( + slog::error!( self.logger, - "ControlMessageHandler: overwrote channel to data receiver for node {}", node_id + "ControlMessageHandler: overwrote channel to data receiver for node {}", + node_id ); } } diff --git a/src/communication/mod.rs b/src/communication/mod.rs index d40a4a45..f252c924 100644 --- a/src/communication/mod.rs +++ b/src/communication/mod.rs @@ -1,31 +1,15 @@ -mod control_message_codec; -mod control_message_handler; -mod endpoints; -mod errors; -mod message_codec; -mod serializable; - -pub mod pusher; -pub mod receivers; -pub mod senders; - -// Re-export structs as if they were defined here. -pub use control_message_codec::ControlMessageCodec; -pub use control_message_handler::ControlMessageHandler; -pub(crate) use endpoints::{RecvEndpoint, SendEndpoint}; -pub use errors::{CodecError, CommunicationError, TryRecvError}; -pub use message_codec::MessageCodec; -pub use pusher::{Pusher, PusherT}; -pub use serializable::Serializable; +use std::{ + fmt::Debug, + net::SocketAddr, + sync::Arc, + time::{Duration, Instant}, +}; use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt}; use bytes::BytesMut; use futures::future; use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use slog; use tokio::{ io::AsyncWriteExt, net::{TcpListener, TcpStream}, @@ -33,7 +17,33 @@ use tokio::{ time::delay_for, }; -use crate::{dataflow::stream::StreamId, node::node::NodeId, OperatorId}; +use crate::{dataflow::stream::StreamId, node::NodeId, OperatorId}; + +// Private submodules +mod control_message_codec; +mod control_message_handler; +mod endpoints; +mod errors; +mod message_codec; +mod serializable; + +// Crate-wide visible submodules +pub(crate) mod pusher; +pub(crate) mod receivers; +pub(crate) mod senders; + +// Private imports +use serializable::Serializable; + +// Module-wide exports +pub(crate) use control_message_codec::ControlMessageCodec; +pub(crate) use control_message_handler::ControlMessageHandler; +pub(crate) use errors::{CodecError, CommunicationError, TryRecvError}; +pub(crate) use message_codec::MessageCodec; +pub(crate) use pusher::{Pusher, PusherT}; + +// Crate-wide exports +pub(crate) use endpoints::{RecvEndpoint, SendEndpoint}; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ControlMessage { @@ -156,7 +166,7 @@ async fn connect_to_node( match stream.write(&buffer[..]).await { Ok(_) => return Ok(stream), Err(e) => { - error!( + slog::error!( logger, "Node {}: could not send node id to {}; error {}; retrying in 100 ms", node_id, @@ -171,7 +181,7 @@ async fn connect_to_node( // Only print connection errors every 1s. let now = Instant::now(); if now.duration_since(last_err_msg_time) >= Duration::from_secs(1) { - error!( + slog::error!( logger, "Node {}: could not connect to {}; error {}; retrying", node_id, @@ -220,7 +230,7 @@ async fn read_node_id( match stream.read_exact(&mut buffer).await { Ok(n) => n, Err(e) => { - error!(logger, "failed to read from socket; err = {:?}", e); + slog::error!(logger, "failed to read from socket; err = {:?}", e); return Err(e); } }; diff --git a/src/communication/receivers.rs b/src/communication/receivers.rs index 43259da7..60ebfa23 100644 --- a/src/communication/receivers.rs +++ b/src/communication/receivers.rs @@ -16,7 +16,7 @@ use crate::{ InterProcessMessage, MessageCodec, PusherT, }, dataflow::stream::StreamId, - node::node::NodeId, + node::NodeId, scheduler::endpoints_manager::ChannelsToReceivers, }; diff --git a/src/communication/senders.rs b/src/communication/senders.rs index a5af44d9..bb848ee0 100644 --- a/src/communication/senders.rs +++ b/src/communication/senders.rs @@ -15,7 +15,7 @@ use crate::communication::{ CommunicationError, ControlMessage, ControlMessageCodec, ControlMessageHandler, InterProcessMessage, MessageCodec, }; -use crate::node::node::NodeId; +use crate::node::NodeId; use crate::scheduler::endpoints_manager::ChannelsToSenders; #[allow(dead_code)] diff --git a/src/dataflow/graph/mod.rs b/src/dataflow/graph/mod.rs index 760518a7..e2594475 100644 --- a/src/dataflow/graph/mod.rs +++ b/src/dataflow/graph/mod.rs @@ -7,14 +7,20 @@ use crate::{ scheduler::channel_manager::ChannelManager, }; +// Private submodules +mod edge; +mod graph; +mod vertex; + +// Public submodules pub mod default_graph; -pub mod edge; -pub mod graph; -pub mod vertex; -pub use edge::{Channel, ChannelMetadata, StreamMetadata}; +// Crate-wide exports +pub(crate) use edge::{Channel, ChannelMetadata, StreamMetadata}; +pub(crate) use vertex::{DriverMetadata, OperatorMetadata, Vertex}; + +// Public exports pub use graph::Graph; -pub use vertex::{DriverMetadata, OperatorMetadata, Vertex}; pub trait OperatorRunner: 'static diff --git a/src/dataflow/message.rs b/src/dataflow/message.rs index 16513165..a11e5874 100644 --- a/src/dataflow/message.rs +++ b/src/dataflow/message.rs @@ -1,6 +1,7 @@ +use std::{cmp::Ordering, fmt::Debug}; + +use abomonation_derive::*; use serde::{Deserialize, Serialize}; -use std::cmp::Ordering; -use std::fmt::Debug; /// Trait for valid message data. The data must be clonable, sendable between threads and /// serializable. diff --git a/src/dataflow/mod.rs b/src/dataflow/mod.rs index c7384175..fbcfa72a 100644 --- a/src/dataflow/mod.rs +++ b/src/dataflow/mod.rs @@ -1,4 +1,4 @@ -// Export the modules to be visible outside of the dataflow module. +// Public submodules pub mod callback_builder; pub mod graph; pub mod message; @@ -7,13 +7,14 @@ pub mod operators; pub mod state; pub mod stream; -// Re-export structs as if they were defined here. +// Crate-wide exports +pub(crate) use stream::EventMakerT; + +// Public exports pub use message::{Data, Message, Timestamp, TimestampedData}; pub use operator::{Operator, OperatorConfig}; pub use state::State; -pub use stream::{ - EventMakerT, LoopStream, ReadStream, StatefulReadStream, WriteStream, WriteStreamError, -}; +pub use stream::{LoopStream, ReadStream, StatefulReadStream, WriteStream}; #[cfg(test)] mod tests { diff --git a/src/dataflow/operators/mod.rs b/src/dataflow/operators/mod.rs index 03d9a1fc..32dc2966 100644 --- a/src/dataflow/operators/mod.rs +++ b/src/dataflow/operators/mod.rs @@ -1,7 +1,9 @@ -pub mod join_operator; -pub mod map_operator; -pub mod source_operator; +// Private submodules +mod join_operator; +mod map_operator; +mod source_operator; +// Public exports pub use crate::dataflow::operators::join_operator::JoinOperator; pub use crate::dataflow::operators::map_operator::MapOperator; pub use crate::dataflow::operators::source_operator::SourceOperator; diff --git a/src/dataflow/stream/ingest_stream.rs b/src/dataflow/stream/ingest_stream.rs index e3a8245c..327eab00 100644 --- a/src/dataflow/stream/ingest_stream.rs +++ b/src/dataflow/stream/ingest_stream.rs @@ -7,12 +7,12 @@ use std::{ use serde::Deserialize; use crate::{ - dataflow::{graph::default_graph, Data, Message, WriteStreamError}, + dataflow::{graph::default_graph, Data, Message}, node::NodeId, scheduler::channel_manager::ChannelManager, }; -use super::{StreamId, WriteStream, WriteStreamT}; +use super::{errors::WriteStreamError, StreamId, WriteStream, WriteStreamT}; pub struct IngestStream where diff --git a/src/dataflow/stream/mod.rs b/src/dataflow/stream/mod.rs index d576507f..fa68751d 100644 --- a/src/dataflow/stream/mod.rs +++ b/src/dataflow/stream/mod.rs @@ -5,17 +5,23 @@ use crate::{ node::operator_event::OperatorEvent, }; +// Private submodules +mod extract_stream; +mod ingest_stream; +mod internal_read_stream; +mod internal_stateful_read_stream; +mod loop_stream; +mod read_stream; +mod stateful_read_stream; +mod write_stream; + +// Public submodules pub mod errors; -pub mod extract_stream; -pub mod ingest_stream; -pub mod internal_read_stream; -pub mod internal_stateful_read_stream; -pub mod loop_stream; -pub mod read_stream; -pub mod stateful_read_stream; -pub mod write_stream; - -pub use errors::WriteStreamError; + +// Private imports +use errors::WriteStreamError; + +// Public exports pub use extract_stream::ExtractStream; pub use ingest_stream::IngestStream; pub use internal_read_stream::InternalReadStream; @@ -27,7 +33,7 @@ pub use write_stream::WriteStream; pub type StreamId = crate::Uuid; -pub trait EventMakerT { +pub(crate) trait EventMakerT { type EventDataType: Data; /// Returns the id of the stream. diff --git a/src/dataflow/stream/write_stream.rs b/src/dataflow/stream/write_stream.rs index bcf4d38c..05457983 100644 --- a/src/dataflow/stream/write_stream.rs +++ b/src/dataflow/stream/write_stream.rs @@ -4,10 +4,10 @@ use serde::Deserialize; use crate::{ communication::{Pusher, SendEndpoint}, - dataflow::{Data, Message, Timestamp, WriteStreamError}, + dataflow::{Data, Message, Timestamp}, }; -use super::{StreamId, WriteStreamT}; +use super::{errors::WriteStreamError, StreamId, WriteStreamT}; // TODO: refactor with internal write stream #[derive(Clone)] diff --git a/src/lib.rs b/src/lib.rs index 908b5f4b..030266cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,17 +33,13 @@ #![feature(box_into_pin)] #![feature(vec_remove_item)] -extern crate abomonation; -#[macro_use] -extern crate abomonation_derive; -extern crate bincode; -extern crate clap; -#[macro_use] -extern crate lazy_static; -#[macro_use] -pub extern crate slog; -extern crate slog_term; -pub extern crate tokio; +pub use ::slog; +pub use ::tokio; + +use abomonation_derive::*; +use clap; +use lazy_static::lazy_static; +use slog_term; // Libraries used in this file. use clap::{App, Arg}; @@ -54,15 +50,20 @@ use slog_term::term_full; use std::{cell::RefCell, fmt}; use uuid; -// Export the modules to be visible outside of the library. -pub mod communication; +// Private submodules pub mod configuration; -pub mod dataflow; -pub mod node; #[cfg(feature = "python")] pub mod python; + +// Crate-wide visible submodules + +// Public submodules +pub mod communication; +pub mod dataflow; +pub mod node; pub mod scheduler; +// Public exports pub use configuration::Configuration; pub use dataflow::OperatorConfig; @@ -423,7 +424,7 @@ pub fn reset() { lazy_static! { static ref TERMINAL_LOGGER: Logger = - Logger::root(std::sync::Mutex::new(term_full()).fuse(), o!()); + Logger::root(std::sync::Mutex::new(term_full()).fuse(), slog::o!()); } pub fn get_terminal_logger() -> slog::Logger { diff --git a/src/node/lattice.rs b/src/node/lattice.rs index 97a98049..2647c41f 100644 --- a/src/node/lattice.rs +++ b/src/node/lattice.rs @@ -119,7 +119,7 @@ impl PartialOrd for RunnableEvent { /// # Example /// The below example shows how to insert events into the Lattice and retrieve runnable events from /// the lattice. -/// ``` +/// ```ignore /// use erdos::node::{operator_event::OperatorEvent, lattice::ExecutionLattice}; /// use erdos::dataflow::Timestamp; /// use futures::executor::block_on; diff --git a/src/node/mod.rs b/src/node/mod.rs index 318a12a9..6ec6a766 100644 --- a/src/node/mod.rs +++ b/src/node/mod.rs @@ -1,8 +1,12 @@ -// Modules visible outside of the node module. -pub mod node; -pub mod operator_event; +// Private submodules +mod lattice; +mod node; + +// Crate-wide visible submodules +pub(crate) mod operator_event; + +// Public submodules pub mod operator_executor; -pub mod lattice; -// Re-export structs as if they were defined here. +// Public exports pub use node::{Node, NodeHandle, NodeId}; diff --git a/src/node/node.rs b/src/node/node.rs index bcb104d6..574a8155 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -1,10 +1,12 @@ -use futures::future; -use futures_util::stream::StreamExt; use std::{ collections::{HashMap, HashSet}, sync::Arc, thread, }; + +use futures::future; +use futures_util::stream::StreamExt; +use slog; use tokio::{ net::TcpStream, runtime::Builder, @@ -72,7 +74,7 @@ impl Node { /// /// The method never returns. pub fn run(&mut self) { - debug!(self.config.logger, "Node {}: running", self.id); + slog::debug!(self.config.logger, "Node {}: running", self.id); // Build a runtime with n threads. let mut runtime = Builder::new() .threaded_scheduler() @@ -82,7 +84,7 @@ impl Node { .build() .unwrap(); runtime.block_on(self.async_run()); - debug!(self.config.logger, "Node {}: finished running", self.id); + slog::debug!(self.config.logger, "Node {}: finished running", self.id); } /// Runs an ERDOS node in a seperate OS thread. @@ -117,7 +119,7 @@ impl Node { *started = true; cvar.notify_all(); - debug!(self.config.logger, "Node {}: done initializing.", self.id); + slog::debug!(self.config.logger, "Node {}: done initializing.", self.id); } /// Splits a vector of TCPStreams into `DataSender`s and `DataReceiver`s. @@ -241,9 +243,10 @@ impl Node { } async fn broadcast_local_operators_initialized(&mut self) -> Result<(), String> { - debug!( + slog::debug!( self.config.logger, - "Node {}: initialized all operators on this node.", self.id + "Node {}: initialized all operators on this node.", + self.id ); self.control_handler .broadcast_to_nodes(ControlMessage::AllOperatorsInitializedOnNode(self.id)) @@ -305,9 +308,11 @@ impl Node { .name .clone() .unwrap_or_else(|| format!("{}", operator_info.id)); - debug!( + slog::debug!( self.config.logger, - "Node {}: starting operator {}", self.id, name + "Node {}: starting operator {}", + self.id, + name ); let channel_manager_copy = Arc::clone(&channel_manager); let operator_tx_copy = operator_tx.clone(); @@ -387,32 +392,33 @@ impl Node { control_senders_fut, control_recvs_fut ) { - error!( + slog::error!( logger, - "Non-fatal network communication error; this should not happen! {:?}", e + "Non-fatal network communication error; this should not happen! {:?}", + e ); } tokio::select! { - Err(e) = ops_fut => error!( + Err(e) = ops_fut => slog::error!( logger, "Error running operators on node {:?}: {:?}", self.id, e ), - _ = shutdown_fut => debug!(logger, "Node {}: shutting down", self.id), + _ = shutdown_fut => slog::debug!(logger, "Node {}: shutting down", self.id), } } else { tokio::select! { - Err(e) = senders_fut => error!(logger, "Error with data senders: {:?}", e), - Err(e) = recvs_fut => error!(logger, "Error with data receivers: {:?}", e), - Err(e) = control_senders_fut => error!(logger, "Error with control senders: {:?}", e), - Err(e) = control_recvs_fut => error!( + Err(e) = senders_fut => slog::error!(logger, "Error with data senders: {:?}", e), + Err(e) = recvs_fut => slog::error!(logger, "Error with data receivers: {:?}", e), + Err(e) = control_senders_fut => slog::error!(logger, "Error with control senders: {:?}", e), + Err(e) = control_recvs_fut => slog::error!( self.config.logger, "Error with control receivers: {:?}", e ), - Err(e) = ops_fut => error!( + Err(e) = ops_fut => slog::error!( logger, "Error running operators on node {:?}: {:?}", self.id, e ), - _ = shutdown_fut => debug!(logger, "Node {}: shutting down", self.id), + _ = shutdown_fut => slog::debug!(logger, "Node {}: shutting down", self.id), } } } diff --git a/src/python/mod.rs b/src/python/mod.rs index a07c51b5..14e942cb 100644 --- a/src/python/mod.rs +++ b/src/python/mod.rs @@ -1,7 +1,6 @@ -use pyo3::{exceptions, prelude::*, types::*}; - use std::sync::{Arc, Mutex}; +use pyo3::{exceptions, prelude::*, types::*}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use crate::{ @@ -11,21 +10,21 @@ use crate::{ WriteStream, }, node::{ - node::NodeHandle, operator_executor::{OperatorExecutor, OperatorExecutorStream, OperatorExecutorStreamT}, - Node, NodeId, + Node, NodeHandle, NodeId, }, scheduler::channel_manager::ChannelManager, Configuration, Uuid, }; +// Private submodules mod py_message; mod py_stream; +// Private imports +use py_message::PyMessage; use py_stream::{PyExtractStream, PyIngestStream, PyLoopStream, PyReadStream, PyWriteStream}; -pub(crate) use py_message::PyMessage; - #[pymodule] fn internal(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; @@ -348,7 +347,7 @@ impl Operator for PyOperator { } #[pyclass] -pub(crate) struct PyNodeHandle { +struct PyNodeHandle { node_handle: Option, } diff --git a/src/python/py_stream/mod.rs b/src/python/py_stream/mod.rs index 672b8906..6096d995 100644 --- a/src/python/py_stream/mod.rs +++ b/src/python/py_stream/mod.rs @@ -2,12 +2,14 @@ use pyo3::{exceptions, prelude::*}; use crate::dataflow::StatefulReadStream; +// Private submodules mod py_extract_stream; mod py_ingest_stream; mod py_loop_stream; mod py_read_stream; mod py_write_stream; +// Public exports pub use py_extract_stream::PyExtractStream; pub use py_ingest_stream::PyIngestStream; pub use py_loop_stream::PyLoopStream; diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 471d2016..0af0dac9 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -1,12 +1,14 @@ use crate::dataflow::graph::{Channel, Graph, Vertex}; -// Export the modules to be visible outside of the scheduler module. +// Crate-wide visible submodules +pub(crate) mod endpoints_manager; + +// Public exports pub mod channel_manager; -pub mod endpoints_manager; /// Schedules a dataflow graph. Assigns operators to nodes and updates channels. /// After running this method, there should be no unscheduled channels remaining. -pub fn schedule(graph: &Graph) -> Graph { +pub(crate) fn schedule(graph: &Graph) -> Graph { let mut scheduled_graph = graph.clone(); for stream in scheduled_graph.get_streams_ref_mut() { let source_node_id = match stream.get_source() {