Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hopping window #196

Merged
merged 7 commits into from Feb 3, 2023
Merged

Hopping window #196

merged 7 commits into from Feb 3, 2023

Conversation

Psykopear
Copy link
Contributor

@Psykopear Psykopear commented Feb 1, 2023

Added a HoppingWindowConfig object.

I also refactored the Windower trait to autoimplement some methods since it was equal on both the WindowConfigs. Trait methods can still be overridden if we need a different behavior in the future.

I also added a new macro, add_pymethods, which alleviates the problem of having to copy paste a lot of code for the infamuous egregious hack. I also have a separate branch where I use the macro everywhere else in the codebase, but I'd like to merge this first (or revert it if we don't like the macro).

Fixes #172

@Psykopear Psykopear marked this pull request as ready for review February 1, 2023 15:30
Copy link
Contributor

@whoahbot whoahbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

from bytewax.window import EventClockConfig, HoppingWindowConfig, TumblingWindowConfig


def test_hopping_window():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this test to make it a little easier (for me) to satisfy myself that the window was doing what I expected.

def test_hopping_window():
    start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)

    flow = Dataflow()

    def gen():
        yield ("ALL", {"time": start_at + timedelta(seconds=1), "val": "a"})
        yield ("ALL", {"time": start_at + timedelta(seconds=4), "val": "b"})
        yield ("ALL", {"time": start_at + timedelta(seconds=8), "val": "c"})
        yield ("ALL", {"time": start_at + timedelta(seconds=12), "val": "d"})
        yield ("ALL", {"time": start_at + timedelta(seconds=13), "val": "e"})
        yield ("ALL", {"time": start_at + timedelta(seconds=14), "val": "f"})
        yield ("ALL", {"time": start_at + timedelta(seconds=16), "val": "g"})
        # This is late, and should be ignored
        yield ("ALL", {"time": start_at + timedelta(seconds=1), "val": "h"})

    flow.input("inp", TestingBuilderInputConfig(gen))

    clock_config = EventClockConfig(
        lambda e: e["time"], wait_for_system_duration=timedelta(0)
    )
    window_config = HoppingWindowConfig(
        length=timedelta(seconds=10), start_at=start_at, offset=timedelta(seconds=5)
    )

    def add(acc, x):
        acc.append(x["val"])
        return acc

    flow.fold_window("sum", clock_config, window_config, list, add)

    out = []
    flow.capture(TestingOutputConfig(out))

    run_main(flow)
    print(out)

    assert sorted(out) == sorted(
        [
            ("ALL", ["a", "b", "c"]),
            ("ALL", ["c", "d", "e", "f"]),
            ("ALL", ["d", "e", "f", "g"]),
            ("ALL", ["g"]),
        ]
    )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think #196 (comment) is a little more readable. Not for this PR, but probably all the window tests should look like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's much more clear, I'm changing it

src/macros.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@davidselassie davidselassie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks.

from bytewax.window import EventClockConfig, HoppingWindowConfig, TumblingWindowConfig


def test_hopping_window():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think #196 (comment) is a little more readable. Not for this PR, but probably all the window tests should look like this.

/// Look at the current watermark, determine which windows are now
/// closed, return them, and remove them from internal state.
fn drain_closed(&mut self, watermark: &DateTime<Utc>) -> Vec<WindowKey>;
fn drain_closed(&mut self, watermark: &DateTime<Utc>) -> Vec<WindowKey> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking with keeping these all abstract was that "sessionization" windowers might not have a fixed hash of closed times and would need a different data structure. Although there might be a way to keep using this. I'm happy for you to merge this and once we get to session windows we can figure out if it still works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I was thinking about this too when reading about other window types. This way I tied the data structure to the trait, which it wasn't before, so I'm probably going to revert/rethink this before merging

Co-authored-by: Dan Herrera <whoahbot@bytewax.io>
Signed-off-by: Federico Dolce <psykopear@gmail.com>
@Psykopear
Copy link
Contributor Author

Psykopear commented Feb 2, 2023

So, I also realized that in the end a monad is a monoid in the category of endofunctors a TumblingWindow is just a HoppingWindow with length==offset, so I refactored TumblingWindowConfig to be:

impl WindowBuilder for TumblingWindowConfig {
    fn build(&self, _py: Python) -> StringResult<Builder> {
        Ok(Box::new(super::hopping_window::HoppingWindower::builder(
            self.length,
            self.length,
            self.start_at.unwrap_or_else(Utc::now),
        )))
    }
}

@Psykopear Psykopear merged commit f6d09e7 into main Feb 3, 2023
@Psykopear Psykopear deleted the hopping-window branch February 3, 2023 08:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Support hopping window
3 participants