Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LINQ] Add timestamp join #215

Merged
merged 9 commits into from
Mar 26, 2022
20 changes: 12 additions & 8 deletions erdos/src/dataflow/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ where

/// Get the current state attached to the operator.
pub fn get_current_state(&mut self) -> Option<&mut S::Item> {
let timestamp = self.get_timestamp().clone();
self.state.at(&timestamp)
self.state.at(&self.timestamp)
}

/// Get the past state attached to the operator.
Expand Down Expand Up @@ -279,8 +278,7 @@ where

/// Get the current state attached to the operator.
pub fn get_current_state(&mut self) -> Option<&mut S::Item> {
let timestamp = self.get_timestamp().clone();
self.state.at(&timestamp)
self.state.at(&self.timestamp)
}

/// Get the past state attached to the operator.
Expand Down Expand Up @@ -417,8 +415,7 @@ where

/// Get the current state attached to the operator.
pub fn get_current_state(&mut self) -> Option<&mut S::Item> {
let timestamp = self.get_timestamp().clone();
self.state.at(&timestamp)
self.state.at(&self.timestamp)
}

/// Get the past state attached to the operator.
Expand All @@ -433,6 +430,14 @@ where
}
}

/// [Experimental] Returns a mutable version of the state.
///
/// Used to garbage-collect time-versioned state in
/// [`TimestampJoin`](crate::dataflow::operators::TimestampJoin)
pub(crate) fn get_state_mut(&mut self) -> &mut S {
self.state
}

/// Get the timestamp of the last committed state.
pub fn get_last_committed_timestamp(&self) -> Timestamp {
self.state.last_committed_timestamp()
Expand Down Expand Up @@ -570,8 +575,7 @@ where

/// Get the current state attached to the operator.
pub fn get_current_state(&mut self) -> Option<&mut S::Item> {
let timestamp = self.get_timestamp().clone();
self.state.at(&timestamp)
self.state.at(&self.timestamp)
}

/// Get the past state attached to the operator.
Expand Down
296 changes: 122 additions & 174 deletions erdos/src/dataflow/operators/join.rs
Original file line number Diff line number Diff line change
@@ -1,198 +1,146 @@
use std::{
cmp::Reverse,
collections::{BinaryHeap, HashMap},
marker::PhantomData,
sync::{Arc, RwLock},
};

use serde::Deserialize;

use crate::dataflow::{
message::Message, stream::WriteStreamT, Data, Operator, OperatorConfig, ReadStream, Timestamp,
WriteStream,
context::TwoInOneOutContext,
message::Message,
operator::{OperatorConfig, TwoInOneOut},
state::TimeVersionedState,
stream::{OperatorStream, Stream, WriteStreamT},
Data,
};

/// A structure that stores the state associated with a stream for the JoinOperator, and provides
/// the associated functions for mutation of the data.
/// Uses a ConcurrentHashMap to store the messages and a min-heap to ensure easy retrieval of the
/// timestamps for cleaning.
#[derive(Clone)]
struct StreamState<D: Data> {
msgs: Arc<RwLock<HashMap<Timestamp, Vec<D>>>>,
// A min-heap tracking the keys of the hashmap.
timestamps: Arc<RwLock<BinaryHeap<Reverse<Timestamp>>>>,
}

impl<D: Data> StreamState<D> {
fn new() -> Self {
Self {
msgs: Arc::new(RwLock::new(HashMap::new())),
timestamps: Arc::new(RwLock::new(BinaryHeap::new())),
}
}

/// Adds a message to the ConcurrentHashMap.
fn add_msg(&mut self, timestamp: &Timestamp, msg: D) {
// Insert a new vector if the key does not exist, and add the key to the timestamps.
let mut msgs = self.msgs.write().unwrap();
match msgs.get_mut(timestamp) {
Some(msg_vec) => msg_vec.push(msg),
None => {
msgs.insert(timestamp.clone(), vec![msg]);
self.timestamps
.write()
.unwrap()
.push(Reverse(timestamp.clone()));
}
};
}

/// Cleans the state corresponding to a given Timestamp (upto and including).
fn clean_state(&self, timestamp: &Timestamp) {
let timestamps = &mut self.timestamps.write().unwrap();
while timestamps.peek().map_or(false, |t| t.0 <= *timestamp) {
let t = timestamps.pop().unwrap().0;
self.msgs
.write()
.unwrap()
.remove(&t)
.expect("StreamState: expected Timestamp to be present");
}
}

/// Retrieve the state.
fn get_state(&self, timestamp: &Timestamp) -> Option<Vec<D>> {
match self.msgs.read().unwrap().get(timestamp) {
Some(value) => Some(value.clone()),
None => None,
}
//(*((*(self.msgs.read().unwrap())).get(timestamp).unwrap())).clone()
}
}

/// An operator that joins two incoming streams of type D1 and D2 into a stream of type D3 using
/// the function provided.
/// Joins messages with matching timestamps from two different streams.
///
/// The following table provides an example of how the [`TimestampJoin`] processes data from two
/// streams:
///
/// | Timestamp | Left input | Right input | [`TimestampJoin`] output |
/// |-----------|------------|-------------|--------------------------------------------|
/// | 1 | a <br> b | 1 <br> 2 | (a, 1) <br> (a, 2) <br> (b, 1) <br> (b, 2) |
/// | 2 | c | | |
/// | 3 | | 3 | |
/// | 4 | d | 4 | (d, 4) |
///
/// # Example
/// The below example shows how to use a JoinOperator to sum two streams of incoming u32 messages,
/// and return them as u64 messages.
/// The following example shows how to use a [`TimestampJoin`] to join two streams.
///
/// ```
/// # use erdos::dataflow::{stream::IngestStream, operators::JoinOperator, OperatorConfig};
/// # use erdos::*;
/// # use erdos::dataflow::{
/// # stream::IngestStream,
/// # operator::OperatorConfig,
/// # operators::TimestampJoin,
/// # state::TimeVersionedState
/// # };
/// #
/// # let mut left_u32_stream = IngestStream::new(0);
/// # let mut right_u32_stream = IngestStream::new(0);
/// # let left_stream: IngestStream<String> = IngestStream::new();
/// # let right_stream: IngestStream<usize> = IngestStream::new();
/// #
/// // Add the joining function as an argument to the operator via the OperatorConfig.
/// let join_config = OperatorConfig::new()
/// .name("JoinOperator")
/// .arg(|left_data: Vec<u32>, right_data: Vec<u32>| -> u64 {
/// (left_data.iter().sum::<u32>() + right_data.iter().sum::<u32>()) as u64
/// });
/// let output_stream = connect_1_write!(
/// JoinOperator<u32, u32, u64>, join_config,left_u32_stream, right_u32_stream);
/// // Joins two streams of types String and usize
/// let joined_stream = erdos::connect_two_in_one_out(
/// TimestampJoin::new,
/// TimeVersionedState::new,
/// OperatorConfig::new().name("TimestampJoin"),
/// &left_stream,
/// &right_stream,
/// );
/// ```
pub struct JoinOperator<D1: Data, D2: Data, D3: Data> {
phantom_data: PhantomData<(D1, D2, D3)>,
}
pub struct TimestampJoin {}

impl<'a, D1: Data, D2: Data, D3: Data + Deserialize<'a>> JoinOperator<D1, D2, D3> {
/// Returns a new instance of the JoinOperator.
///
/// # Arguments
/// * `config` - An instance of OperatorConfig that provides the closure used to join items of
/// type Vec<D1> and Vec<D2> to a value of type D3.
/// * `input_stream_left` - Represents the incoming stream of messages of type D1.
/// * `input_stream_right` - Represents the incoming stream of messages of type D2.
/// * `output_stream` - Represents an outgoing stream of messages of type D3.
pub fn new<F: 'static + Clone + Fn(Vec<D1>, Vec<D2>) -> D3>(
config: OperatorConfig<F>,
input_stream_left: ReadStream<D1>,
input_stream_right: ReadStream<D2>,
output_stream: WriteStream<D3>,
) -> Self {
let name = match config.name {
Some(s) => s,
None => format!("JoinOperator {}", config.id),
};

// Package the state with the left stream and add a callback to the new stream.
let stateful_stream_left = input_stream_left.add_state(StreamState::<D1>::new());
stateful_stream_left.add_callback(Self::on_left_data_callback);

// Package the state with the right stream and add a callback to the new stream.
let stateful_stream_right = input_stream_right.add_state(StreamState::<D2>::new());
stateful_stream_right.add_callback(Self::on_right_data_callback);

let cb = config
.arg
.unwrap_or_else(|| panic!("{}: no join function provided", name));
stateful_stream_left
.add_read_stream(&stateful_stream_right)
.borrow_mut()
.add_write_stream(&output_stream)
.borrow_mut()
.add_watermark_callback(
move |t: &Timestamp,
left_state: &StreamState<D1>,
right_state: &StreamState<D2>,
write_stream: &mut WriteStream<D3>| {
Self::on_watermark_callback(t, left_state, right_state, write_stream, &cb)
},
);

Self {
phantom_data: PhantomData,
}
impl TimestampJoin {
pub fn new() -> Self {
Self {}
}
}

/// The function to be called when a message is received on the left input stream.
/// This callback adds the data received in the message to the state associated with the
/// stream.
fn on_left_data_callback(t: &Timestamp, msg: &D1, state: &mut StreamState<D1>) {
state.add_msg(t, msg.clone());
impl<T, U> TwoInOneOut<TimeVersionedState<(Vec<T>, Vec<U>)>, T, U, (T, U)> for TimestampJoin
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn on_left_data(
&mut self,
ctx: &mut TwoInOneOutContext<TimeVersionedState<(Vec<T>, Vec<U>)>, (T, U)>,
data: &T,
) {
let timestamp = ctx.get_timestamp().clone();
let (left_items, right_items) = ctx.get_current_state().unwrap();
left_items.push(data.clone());

let messages: Vec<_> = right_items
.iter()
.cloned()
.map(|right_item| Message::new_message(timestamp.clone(), (data.clone(), right_item)))
.collect();
for msg in messages {
ctx.get_write_stream().send(msg).unwrap();
}
pschafhalter marked this conversation as resolved.
Show resolved Hide resolved
}

/// The function to be called when a message is received on the right input stream.
/// This callback adds the data received in the message to the state associated with the
/// stream.
fn on_right_data_callback(t: &Timestamp, msg: &D2, state: &mut StreamState<D2>) {
state.add_msg(t, msg.clone());
fn on_right_data(
&mut self,
ctx: &mut TwoInOneOutContext<TimeVersionedState<(Vec<T>, Vec<U>)>, (T, U)>,
data: &U,
) {
let timestamp = ctx.get_timestamp().clone();
let (left_items, right_items) = ctx.get_current_state().unwrap();
right_items.push(data.clone());

let messages: Vec<_> = left_items
.iter()
.cloned()
.map(|left_item| Message::new_message(timestamp.clone(), (left_item, data.clone())))
.collect();
for msg in messages {
ctx.get_write_stream().send(msg).unwrap();
}
}

/// The function to be called when a watermark is received on both the left and the right
/// streams.
/// This callback uses the saved state from the two streams and joins them using the provided
/// closure.
fn on_watermark_callback<F: 'static + Clone + Fn(Vec<D1>, Vec<D2>) -> D3>(
t: &Timestamp,
left_state: &StreamState<D1>,
right_state: &StreamState<D2>,
write_stream: &mut WriteStream<D3>,
join_function: &F,
fn on_watermark(
&mut self,
ctx: &mut TwoInOneOutContext<TimeVersionedState<(Vec<T>, Vec<U>)>, (T, U)>,
) {
// Retrieve the state and run the given callback on it.
let left_state_t: Vec<D1> = left_state.get_state(t).unwrap();
let right_state_t: Vec<D2> = right_state.get_state(t).unwrap();
let result_t: D3 = join_function(left_state_t, right_state_t);

// Send the result on the write stream.
write_stream
.send(Message::new_message(t.clone(), result_t))
.expect("JoinOperator: error sending on write stream");

// Garbage collect all the data upto and including this timestamp.
left_state.clean_state(t);
right_state.clean_state(t);
let timestamp = ctx.get_timestamp().clone();
ctx.get_state_mut().evict_until(&timestamp);
}
}

pub fn connect(
_left_read_stream: &ReadStream<D1>,
_right_read_stream: &ReadStream<D2>,
) -> WriteStream<D3> {
WriteStream::new()
}
/// Extension trait for joining pairs of streams.
///
/// Names the operators using the names of the incoming streams.
pub trait Join<T, U>
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn timestamp_join(&self, other: &dyn Stream<U>) -> OperatorStream<(T, U)>;
}

impl<'a, D1: Data, D2: Data, D3: Data + Deserialize<'a>> Operator for JoinOperator<D1, D2, D3> {}
impl<S, T, U> Join<T, U> for S
where
S: Stream<T>,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
/// Joins messages with matching timestamps from two different streams using a [`TimestampJoin`].
///
/// # Example
///
/// ```
/// # use erdos::dataflow::{stream::IngestStream, operators::Join};
/// #
/// # let left_stream: IngestStream<String> = IngestStream::new();
/// # let right_stream: IngestStream<usize> = IngestStream::new();
/// #
/// let joined_stream = left_stream.timestamp_join(&right_stream);
/// ```
fn timestamp_join(&self, other: &dyn Stream<U>) -> OperatorStream<(T, U)> {
let name = format!("TimestampJoinOp_{}_{}", self.name(), other.name());
crate::connect_two_in_one_out(
TimestampJoin::new,
TimeVersionedState::new,
OperatorConfig::new().name(&name),
self,
other,
)
}
}
2 changes: 2 additions & 0 deletions erdos/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ pub mod ros;
// mod join_operator;
mod concat;
mod filter;
mod join;
mod map;
mod split;

// Public exports
// pub use crate::dataflow::operators::join_operator::JoinOperator;
pub use concat::{Concat, ConcatOperator};
pub use filter::{Filter, FilterOperator};
pub use join::{Join, TimestampJoin};
pub use map::{FlatMapOperator, Map};
pub use split::{Split, SplitOperator};