Skip to content

Commit

Permalink
Reorganize modules to restrict visibility (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
pschafhalter committed Jun 17, 2020
1 parent 6368637 commit 68e74c5
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 119 deletions.
23 changes: 14 additions & 9 deletions src/communication/control_message_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use slog::Logger;
use slog::{self, Logger};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

use crate::node::NodeId;
Expand All @@ -22,6 +22,7 @@ pub struct ControlMessageHandler {
channels_to_nodes: HashMap<NodeId, UnboundedSender<ControlMessage>>,
}

#[allow(dead_code)]
impl ControlMessageHandler {
pub fn new(logger: Logger) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
Expand All @@ -43,9 +44,10 @@ impl ControlMessageHandler {
tx: UnboundedSender<ControlMessage>,
) {
if let Some(_) = self.channels_to_control_senders.insert(node_id, tx) {
error!(
slog::error!(
self.logger,
"ControlMessageHandler: overwrote channel to control sender for node {}", node_id
"ControlMessageHandler: overwrote channel to control sender for node {}",
node_id
);
}
}
Expand Down Expand Up @@ -77,9 +79,10 @@ impl ControlMessageHandler {
tx: UnboundedSender<ControlMessage>,
) {
if let Some(_) = self.channels_to_control_receivers.insert(node_id, tx) {
error!(
slog::error!(
self.logger,
"ControlMessageHandler: overwrote channel to control receiver for node {}", node_id
"ControlMessageHandler: overwrote channel to control receiver for node {}",
node_id
);
}
}
Expand Down Expand Up @@ -111,9 +114,10 @@ impl ControlMessageHandler {
tx: UnboundedSender<ControlMessage>,
) {
if let Some(_) = self.channels_to_data_senders.insert(node_id, tx) {
error!(
slog::error!(
self.logger,
"ControlMessageHandler: overwrote channel to data sender for node {}", node_id
"ControlMessageHandler: overwrote channel to data sender for node {}",
node_id
);
}
}
Expand Down Expand Up @@ -145,9 +149,10 @@ impl ControlMessageHandler {
tx: UnboundedSender<ControlMessage>,
) {
if let Some(_) = self.channels_to_data_receivers.insert(node_id, tx) {
error!(
slog::error!(
self.logger,
"ControlMessageHandler: overwrote channel to data receiver for node {}", node_id
"ControlMessageHandler: overwrote channel to data receiver for node {}",
node_id
);
}
}
Expand Down
64 changes: 37 additions & 27 deletions src/communication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,49 @@
mod control_message_codec;
mod control_message_handler;
mod endpoints;
mod errors;
mod message_codec;
mod serializable;

pub mod pusher;
pub mod receivers;
pub mod senders;

// Re-export structs as if they were defined here.
pub use control_message_codec::ControlMessageCodec;
pub use control_message_handler::ControlMessageHandler;
pub(crate) use endpoints::{RecvEndpoint, SendEndpoint};
pub use errors::{CodecError, CommunicationError, TryRecvError};
pub use message_codec::MessageCodec;
pub use pusher::{Pusher, PusherT};
pub use serializable::Serializable;
use std::{
fmt::Debug,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

use byteorder::{ByteOrder, NetworkEndian, WriteBytesExt};
use bytes::BytesMut;
use futures::future;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use slog;
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
prelude::*,
time::delay_for,
};

use crate::{dataflow::stream::StreamId, node::node::NodeId, OperatorId};
use crate::{dataflow::stream::StreamId, node::NodeId, OperatorId};

// Private submodules
mod control_message_codec;
mod control_message_handler;
mod endpoints;
mod errors;
mod message_codec;
mod serializable;

// Crate-wide visible submodules
pub(crate) mod pusher;
pub(crate) mod receivers;
pub(crate) mod senders;

// Private imports
use serializable::Serializable;

// Module-wide exports
pub(crate) use control_message_codec::ControlMessageCodec;
pub(crate) use control_message_handler::ControlMessageHandler;
pub(crate) use errors::{CodecError, CommunicationError, TryRecvError};
pub(crate) use message_codec::MessageCodec;
pub(crate) use pusher::{Pusher, PusherT};

// Crate-wide exports
pub(crate) use endpoints::{RecvEndpoint, SendEndpoint};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ControlMessage {
Expand Down Expand Up @@ -156,7 +166,7 @@ async fn connect_to_node(
match stream.write(&buffer[..]).await {
Ok(_) => return Ok(stream),
Err(e) => {
error!(
slog::error!(
logger,
"Node {}: could not send node id to {}; error {}; retrying in 100 ms",
node_id,
Expand All @@ -171,7 +181,7 @@ async fn connect_to_node(
// Only print connection errors every 1s.
let now = Instant::now();
if now.duration_since(last_err_msg_time) >= Duration::from_secs(1) {
error!(
slog::error!(
logger,
"Node {}: could not connect to {}; error {}; retrying",
node_id,
Expand Down Expand Up @@ -220,7 +230,7 @@ async fn read_node_id(
match stream.read_exact(&mut buffer).await {
Ok(n) => n,
Err(e) => {
error!(logger, "failed to read from socket; err = {:?}", e);
slog::error!(logger, "failed to read from socket; err = {:?}", e);
return Err(e);
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/communication/receivers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
InterProcessMessage, MessageCodec, PusherT,
},
dataflow::stream::StreamId,
node::node::NodeId,
node::NodeId,
scheduler::endpoints_manager::ChannelsToReceivers,
};

Expand Down
2 changes: 1 addition & 1 deletion src/communication/senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::communication::{
CommunicationError, ControlMessage, ControlMessageCodec, ControlMessageHandler,
InterProcessMessage, MessageCodec,
};
use crate::node::node::NodeId;
use crate::node::NodeId;
use crate::scheduler::endpoints_manager::ChannelsToSenders;

#[allow(dead_code)]
Expand Down
16 changes: 11 additions & 5 deletions src/dataflow/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ use crate::{
scheduler::channel_manager::ChannelManager,
};

// Private submodules
mod edge;
mod graph;
mod vertex;

// Public submodules
pub mod default_graph;
pub mod edge;
pub mod graph;
pub mod vertex;

pub use edge::{Channel, ChannelMetadata, StreamMetadata};
// Crate-wide exports
pub(crate) use edge::{Channel, ChannelMetadata, StreamMetadata};
pub(crate) use vertex::{DriverMetadata, OperatorMetadata, Vertex};

// Public exports
pub use graph::Graph;
pub use vertex::{DriverMetadata, OperatorMetadata, Vertex};

pub trait OperatorRunner:
'static
Expand Down
5 changes: 3 additions & 2 deletions src/dataflow/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{cmp::Ordering, fmt::Debug};

use abomonation_derive::*;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::Debug;

/// Trait for valid message data. The data must be clonable, sendable between threads and
/// serializable.
Expand Down
11 changes: 6 additions & 5 deletions src/dataflow/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Export the modules to be visible outside of the dataflow module.
// Public submodules
pub mod callback_builder;
pub mod graph;
pub mod message;
Expand All @@ -7,13 +7,14 @@ pub mod operators;
pub mod state;
pub mod stream;

// Re-export structs as if they were defined here.
// Crate-wide exports
pub(crate) use stream::EventMakerT;

// Public exports
pub use message::{Data, Message, Timestamp, TimestampedData};
pub use operator::{Operator, OperatorConfig};
pub use state::State;
pub use stream::{
EventMakerT, LoopStream, ReadStream, StatefulReadStream, WriteStream, WriteStreamError,
};
pub use stream::{LoopStream, ReadStream, StatefulReadStream, WriteStream};

#[cfg(test)]
mod tests {
Expand Down
8 changes: 5 additions & 3 deletions src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod join_operator;
pub mod map_operator;
pub mod source_operator;
// Private submodules
mod join_operator;
mod map_operator;
mod source_operator;

// Public exports
pub use crate::dataflow::operators::join_operator::JoinOperator;
pub use crate::dataflow::operators::map_operator::MapOperator;
pub use crate::dataflow::operators::source_operator::SourceOperator;
4 changes: 2 additions & 2 deletions src/dataflow/stream/ingest_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::{
use serde::Deserialize;

use crate::{
dataflow::{graph::default_graph, Data, Message, WriteStreamError},
dataflow::{graph::default_graph, Data, Message},
node::NodeId,
scheduler::channel_manager::ChannelManager,
};

use super::{StreamId, WriteStream, WriteStreamT};
use super::{errors::WriteStreamError, StreamId, WriteStream, WriteStreamT};

pub struct IngestStream<D>
where
Expand Down
28 changes: 17 additions & 11 deletions src/dataflow/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ use crate::{
node::operator_event::OperatorEvent,
};

// Private submodules
mod extract_stream;
mod ingest_stream;
mod internal_read_stream;
mod internal_stateful_read_stream;
mod loop_stream;
mod read_stream;
mod stateful_read_stream;
mod write_stream;

// Public submodules
pub mod errors;
pub mod extract_stream;
pub mod ingest_stream;
pub mod internal_read_stream;
pub mod internal_stateful_read_stream;
pub mod loop_stream;
pub mod read_stream;
pub mod stateful_read_stream;
pub mod write_stream;

pub use errors::WriteStreamError;

// Private imports
use errors::WriteStreamError;

// Public exports
pub use extract_stream::ExtractStream;
pub use ingest_stream::IngestStream;
pub use internal_read_stream::InternalReadStream;
Expand All @@ -27,7 +33,7 @@ pub use write_stream::WriteStream;

pub type StreamId = crate::Uuid;

pub trait EventMakerT {
pub(crate) trait EventMakerT {
type EventDataType: Data;

/// Returns the id of the stream.
Expand Down
4 changes: 2 additions & 2 deletions src/dataflow/stream/write_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use serde::Deserialize;

use crate::{
communication::{Pusher, SendEndpoint},
dataflow::{Data, Message, Timestamp, WriteStreamError},
dataflow::{Data, Message, Timestamp},
};

use super::{StreamId, WriteStreamT};
use super::{errors::WriteStreamError, StreamId, WriteStreamT};

// TODO: refactor with internal write stream
#[derive(Clone)]
Expand Down
33 changes: 17 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@
#![feature(box_into_pin)]
#![feature(vec_remove_item)]

extern crate abomonation;
#[macro_use]
extern crate abomonation_derive;
extern crate bincode;
extern crate clap;
#[macro_use]
extern crate lazy_static;
#[macro_use]
pub extern crate slog;
extern crate slog_term;
pub extern crate tokio;
pub use ::slog;
pub use ::tokio;

use abomonation_derive::*;
use clap;
use lazy_static::lazy_static;
use slog_term;

// Libraries used in this file.
use clap::{App, Arg};
Expand All @@ -54,15 +50,20 @@ use slog_term::term_full;
use std::{cell::RefCell, fmt};
use uuid;

// Export the modules to be visible outside of the library.
pub mod communication;
// Private submodules
pub mod configuration;
pub mod dataflow;
pub mod node;
#[cfg(feature = "python")]
pub mod python;

// Crate-wide visible submodules

// Public submodules
pub mod communication;
pub mod dataflow;
pub mod node;
pub mod scheduler;

// Public exports
pub use configuration::Configuration;
pub use dataflow::OperatorConfig;

Expand Down Expand Up @@ -423,7 +424,7 @@ pub fn reset() {

lazy_static! {
static ref TERMINAL_LOGGER: Logger =
Logger::root(std::sync::Mutex::new(term_full()).fuse(), o!());
Logger::root(std::sync::Mutex::new(term_full()).fuse(), slog::o!());
}

pub fn get_terminal_logger() -> slog::Logger {
Expand Down
2 changes: 1 addition & 1 deletion src/node/lattice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl PartialOrd for RunnableEvent {
/// # Example
/// The below example shows how to insert events into the Lattice and retrieve runnable events from
/// the lattice.
/// ```
/// ```ignore
/// use erdos::node::{operator_event::OperatorEvent, lattice::ExecutionLattice};
/// use erdos::dataflow::Timestamp;
/// use futures::executor::block_on;
Expand Down
Loading

0 comments on commit 68e74c5

Please sign in to comment.