Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hopping window #196

Merged
merged 7 commits into from Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,8 @@
__Add any extra change notes here and we'll put them in the release
notes on GitHub when we make a new release.__

- Added `HoppingWindowConfig` for windowing operators.

## 0.15.0

- Fixes issue with multi-worker recovery. If the cluster crashed
Expand Down
1 change: 1 addition & 0 deletions pysrc/bytewax/window.py
Expand Up @@ -72,5 +72,6 @@
EventClockConfig,
SystemClockConfig,
TumblingWindowConfig,
HoppingWindowConfig,
WindowConfig,
)
67 changes: 55 additions & 12 deletions pytests/test_window.py
Expand Up @@ -4,7 +4,52 @@
from bytewax.execution import run_main
from bytewax.inputs import TestingBuilderInputConfig
from bytewax.outputs import TestingOutputConfig
from bytewax.window import EventClockConfig, TumblingWindowConfig
from bytewax.window import EventClockConfig, HoppingWindowConfig, TumblingWindowConfig


def test_hopping_window():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this test to make it a little easier (for me) to satisfy myself that the window was doing what I expected.

def test_hopping_window():
    start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)

    flow = Dataflow()

    def gen():
        yield ("ALL", {"time": start_at + timedelta(seconds=1), "val": "a"})
        yield ("ALL", {"time": start_at + timedelta(seconds=4), "val": "b"})
        yield ("ALL", {"time": start_at + timedelta(seconds=8), "val": "c"})
        yield ("ALL", {"time": start_at + timedelta(seconds=12), "val": "d"})
        yield ("ALL", {"time": start_at + timedelta(seconds=13), "val": "e"})
        yield ("ALL", {"time": start_at + timedelta(seconds=14), "val": "f"})
        yield ("ALL", {"time": start_at + timedelta(seconds=16), "val": "g"})
        # This is late, and should be ignored
        yield ("ALL", {"time": start_at + timedelta(seconds=1), "val": "h"})

    flow.input("inp", TestingBuilderInputConfig(gen))

    clock_config = EventClockConfig(
        lambda e: e["time"], wait_for_system_duration=timedelta(0)
    )
    window_config = HoppingWindowConfig(
        length=timedelta(seconds=10), start_at=start_at, offset=timedelta(seconds=5)
    )

    def add(acc, x):
        acc.append(x["val"])
        return acc

    flow.fold_window("sum", clock_config, window_config, list, add)

    out = []
    flow.capture(TestingOutputConfig(out))

    run_main(flow)
    print(out)

    assert sorted(out) == sorted(
        [
            ("ALL", ["a", "b", "c"]),
            ("ALL", ["c", "d", "e", "f"]),
            ("ALL", ["d", "e", "f", "g"]),
            ("ALL", ["g"]),
        ]
    )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think #196 (comment) is a little more readable. Not for this PR, but probably all the window tests should look like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's much more clear, I'm changing it

start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)

flow = Dataflow()

def gen():
yield ("ALL", {"time": start_at + timedelta(seconds=1), "val": "a"})
yield ("ALL", {"time": start_at + timedelta(seconds=4), "val": "b"})
yield ("ALL", {"time": start_at + timedelta(seconds=8), "val": "c"})
yield ("ALL", {"time": start_at + timedelta(seconds=12), "val": "d"})
yield ("ALL", {"time": start_at + timedelta(seconds=13), "val": "e"})
yield ("ALL", {"time": start_at + timedelta(seconds=14), "val": "f"})
yield ("ALL", {"time": start_at + timedelta(seconds=16), "val": "g"})
# This is late, and should be ignored
yield ("ALL", {"time": start_at + timedelta(seconds=1), "val": "h"})

flow.input("inp", TestingBuilderInputConfig(gen))

clock_config = EventClockConfig(
lambda e: e["time"], wait_for_system_duration=timedelta(0)
)
window_config = HoppingWindowConfig(
length=timedelta(seconds=10), start_at=start_at, offset=timedelta(seconds=5)
)

def add(acc, x):
acc.append(x["val"])
return acc

flow.fold_window("sum", clock_config, window_config, list, add)

out = []
flow.capture(TestingOutputConfig(out))

run_main(flow)
assert sorted(out) == sorted(
[
("ALL", ["a", "b", "c"]),
("ALL", ["c", "d", "e", "f"]),
("ALL", ["d", "e", "f", "g"]),
("ALL", ["g"]),
]
)


def test_tumbling_window():
Expand All @@ -13,12 +58,12 @@ def test_tumbling_window():
flow = Dataflow()

def gen():
yield ("ALL", {"time": start_at, "val": 1})
yield ("ALL", {"time": start_at + timedelta(seconds=4), "val": 1})
yield ("ALL", {"time": start_at + timedelta(seconds=8), "val": 1})
yield ("ALL", {"time": start_at, "val": "a"})
yield ("ALL", {"time": start_at + timedelta(seconds=4), "val": "b"})
yield ("ALL", {"time": start_at + timedelta(seconds=8), "val": "c"})
# First 10 second window should close just before processing this item.
yield ("ALL", {"time": start_at + timedelta(seconds=12), "val": 1})
yield ("ALL", {"time": start_at + timedelta(seconds=16), "val": 1})
yield ("ALL", {"time": start_at + timedelta(seconds=12), "val": "d"})
yield ("ALL", {"time": start_at + timedelta(seconds=16), "val": "e"})

flow.input("inp", TestingBuilderInputConfig(gen))

Expand All @@ -30,16 +75,14 @@ def gen():
)

def add(acc, x):
if type(acc) == dict:
return acc["val"] + x["val"]
else:
return acc + x["val"]
acc.append(x["val"])
return acc

flow.reduce_window("sum", clock_config, window_config, add)
flow.fold_window("sum", clock_config, window_config, list, add)

out = []
flow.capture(TestingOutputConfig(out))

run_main(flow)

assert sorted(out) == sorted([("ALL", 3), ("ALL", 2)])
assert sorted(out) == sorted([("ALL", ["a", "b", "c"]), ("ALL", ["d", "e"])])
82 changes: 82 additions & 0 deletions src/macros.rs
Expand Up @@ -87,3 +87,85 @@ macro_rules! log_func {
&name[..name.len() - 3]
}};
}

#[macro_export]
/// This macro generates some boilerplate for classes exposed to Python.
/// This is needed mainly for pickling and unpickling of the objects.
///
/// ```rust
/// // Example usage:
/// use bytewax::add_pymethods;
/// use chrono::Duration;
/// use pyo3::{pyclass, Python};
///
/// #[pyclass(module = "bytewax.window", subclass)]
/// #[pyo3(text_signature = "()")]
/// struct WindowConfig;
///
/// #[pyclass(module="bytewax.config", extends=WindowConfig)]
/// #[derive(Clone)]
/// struct HoppingWindowConfig { length: Duration };
///
/// add_pymethods!(
/// HoppingWindowConfig,
/// parent: WindowConfig,
/// py_args: (length,),
/// args {
/// length: Duration => Duration::zero()
/// }
/// );
/// ```
macro_rules! add_pymethods {(
$struct:ident,
parent: $parent:ident,
py_args: $py_args:tt,
args { $($arg:ident: $arg_type:ty => $default:expr),* }
) => {
#[pyo3::pymethods]
impl $struct {
#[new]
#[args $py_args ]
pub(crate) fn py_new($($arg: $arg_type),*) -> (Self, $parent) {
(Self { $($arg),* }, $parent {})
}

/// Return a representation of this class as a PyDict.
fn __getstate__(&self) -> std::collections::HashMap<&str, pyo3::Py<pyo3::PyAny>> {
Python::with_gil(|py| {
std::collections::HashMap::from([
("type", pyo3::IntoPy::into_py(stringify!($struct), py)),
$((stringify!($arg), pyo3::IntoPy::into_py(self.$arg.clone(), py))),*
])
})
}

/// Egregious hack because pickling assumes the type has "empty"
/// mutable objects.
///
/// Pickle always calls `__new__(*__getnewargs__())` but notice we
/// don't have access to the pickled `db_file_path` yet, so we
/// have to pass in some dummy string value that will be
/// overwritten by `__setstate__()` shortly.
#[allow(unused_parens)]
fn __getnewargs__(&self) -> ($($arg_type,) *) {
($($default,) *)
}

/// Unpickle from a PyDict
fn __setstate__(&mut self, state: &pyo3::PyAny) -> pyo3::PyResult<()> {
let dict: &pyo3::types::PyDict = state.downcast()?;
// This is like crate::common::pickle_extract
// Duplicated here so that we can doctest this macro
// without making `pickle_extract` public.
$(
self.$arg = dict
.get_item(stringify!($arg))
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err(
format!("bad pickle contents for {}: {}", stringify!($arg), dict)
))?
.extract()?;
)*
Ok(())
}
}
}}
166 changes: 166 additions & 0 deletions src/window/hopping_window.rs
@@ -0,0 +1,166 @@
use std::collections::HashMap;

use chrono::{DateTime, Duration, Utc};
use pyo3::{pyclass, Python};

use crate::{add_pymethods, common::StringResult, window::WindowConfig};

use super::{Builder, InsertError, StateBytes, WindowBuilder, WindowKey, Windower};

/// Hopping windows of fixed duration.
/// If offset == length, you get tumbling windows.
/// If offset < length, windows overlap.
/// If offset > length, there will be holes between windows.
///
/// Args:
///
/// length (datetime.timedelta): Length of window.
///
/// offset (datetime.timedelta): Offset between windows.
///
/// start_at (datetime.datetime): Instant of the first window. You
/// can use this to align all windows to an hour,
/// e.g. Defaults to system time of dataflow start.
///
/// Returns:
///
/// Config object. Pass this as the `window_config` parameter to
/// your windowing operator.
#[pyclass(module="bytewax.window", extends=WindowConfig)]
#[derive(Clone)]
pub(crate) struct HoppingWindowConfig {
#[pyo3(get)]
pub(crate) length: Duration,
#[pyo3(get)]
pub(crate) offset: Duration,
#[pyo3(get)]
pub(crate) start_at: Option<DateTime<Utc>>,
}

add_pymethods!(
HoppingWindowConfig,
parent: WindowConfig,
py_args: (length, offset, start_at = "None"),
args {
length: Duration => Duration::zero(),
offset: Duration => Duration::zero(),
start_at: Option<DateTime<Utc>> => None
}
);

impl WindowBuilder for HoppingWindowConfig {
fn build(&self, _py: Python) -> StringResult<Builder> {
Ok(Box::new(HoppingWindower::builder(
self.length,
self.offset,
self.start_at.unwrap_or_else(Utc::now),
)))
}
}

pub(crate) struct HoppingWindower {
length: Duration,
offset: Duration,
start_at: DateTime<Utc>,
close_times: HashMap<WindowKey, DateTime<Utc>>,
}

impl HoppingWindower {
pub(crate) fn builder(
length: Duration,
offset: Duration,
start_at: DateTime<Utc>,
) -> impl Fn(Option<StateBytes>) -> Box<dyn Windower> {
move |resume_snapshot| {
let close_times = resume_snapshot
.map(StateBytes::de::<HashMap<WindowKey, DateTime<Utc>>>)
.unwrap_or_default();
Box::new(Self {
length,
offset,
start_at,
close_times,
})
}
}

fn add_close_time(&mut self, key: WindowKey, window_end: DateTime<Utc>) {
self.close_times
.entry(key)
.and_modify(|existing| {
assert!(
existing == &window_end,
"HoppingWindower is not generating consistent boundaries"
)
})
.or_insert(window_end);
}
}

impl Windower for HoppingWindower {
fn insert(
&mut self,
watermark: &DateTime<Utc>,
item_time: &DateTime<Utc>,
) -> Vec<Result<WindowKey, InsertError>> {
let since_start_at = (*item_time - self.start_at).num_milliseconds();
let since_watermark = (*watermark - self.start_at).num_milliseconds();
let offset = self.offset.num_milliseconds();

let windows_count = since_start_at / offset + 1;
let first_window = (since_watermark / offset - 1).max(0);

(first_window..windows_count)
.map(|i| {
// First generate the WindowKey and calculate
// the window_end time
let key = WindowKey(i);
let window_start = self.start_at + Duration::milliseconds(i * offset);
let window_end = window_start + self.length;
// We only want to add items that happened between
// start and end of the window.
// If the watermark is past the end of the window,
// any item is late for this window.
if *item_time >= window_start && *item_time < window_end && *watermark <= window_end
{
self.add_close_time(key, window_end);
Ok(key)
} else {
// We send `Late` even if the item came too early,
// maybe we should differentate
Err(InsertError::Late(key))
}
})
.collect()
}

/// Look at the current watermark, determine which windows are now
/// closed, return them, and remove them from internal state.
fn drain_closed(&mut self, watermark: &DateTime<Utc>) -> Vec<WindowKey> {
let mut future_close_times = HashMap::new();
let mut closed_ids = Vec::new();

for (id, close_at) in self.close_times.iter() {
if close_at < watermark {
closed_ids.push(*id);
} else {
future_close_times.insert(*id, *close_at);
}
}

self.close_times = future_close_times;
closed_ids
}

fn is_empty(&self) -> bool {
self.close_times.is_empty()
}

fn next_close(&self) -> Option<DateTime<Utc>> {
self.close_times.values().cloned().min()
}

fn snapshot(&self) -> StateBytes {
StateBytes::ser::<HashMap<WindowKey, DateTime<Utc>>>(&self.close_times)
}
}
5 changes: 5 additions & 0 deletions src/window/mod.rs
Expand Up @@ -43,8 +43,10 @@ use timely::dataflow::Scope;
use timely::{Data, ExchangeData};

pub(crate) mod clock;
pub(crate) mod hopping_window;
pub(crate) mod tumbling_window;

use self::hopping_window::HoppingWindowConfig;
use self::tumbling_window::TumblingWindowConfig;
use clock::{event_time_clock::EventClockConfig, system_clock::SystemClockConfig, ClockConfig};

Expand Down Expand Up @@ -98,6 +100,8 @@ impl PyConfigClass<Box<dyn WindowBuilder>> for Py<WindowConfig> {
fn downcast(&self, py: Python) -> StringResult<Box<dyn WindowBuilder>> {
if let Ok(conf) = self.extract::<TumblingWindowConfig>(py) {
Ok(Box::new(conf))
} else if let Ok(conf) = self.extract::<HoppingWindowConfig>(py) {
Ok(Box::new(conf))
} else {
let pytype = self.as_ref(py).get_type();
Err(format!("Unknown window_config type: {pytype}",))
Expand Down Expand Up @@ -477,5 +481,6 @@ pub(crate) fn register(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<SystemClockConfig>()?;
m.add_class::<WindowConfig>()?;
m.add_class::<TumblingWindowConfig>()?;
m.add_class::<HoppingWindowConfig>()?;
Ok(())
}