APEXMALHAR-2085: Operator supporting the Beam concepts of windowing, watermarks, triggering and accumulation #319
Conversation
348a570
to
7fb7543
Compare
* | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface Accumulation<InputT, AccumT, OutputT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be helpful to add an example where AccumT is different from OutputT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@davidyan74 There are some compile errors on Stream API and classes depends on that due to your change, Shall I resolve those issues? |
@siyuanh Yes, please do so. Also review my changes and see if you're okay with them |
@siyuanh Note that I removed the support for count based window (for now), because the concepts of watermarks, timestamp, early/late triggers don't apply any more. let me know if you want to discuss this further. |
7129580
to
b1c6b0c
Compare
@@ -94,6 +94,18 @@ | |||
<artifactId>cglib</artifactId> | |||
<version>3.2.1</version> | |||
</dependency> | |||
<dependency> | |||
<!-- required by twitter demo --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this dependency?
20fc1d3
to
de84a96
Compare
* This interface describes the individual window. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface Window |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for Window, we could only provide one interface isWithinWIndow(long time); So for GlobalWindow, we can simplly return true. And in SessionWindow, instead of using static method to merge the window, we can just extendWindow(). The Window interface looks like assume there has a fixed boundary(it could be the interface of FixedWindow). Both GlobalWindow and SessionWindow don't have fixed boundary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Session window can be extended, and two session windows can be merged into one.
5ac037b
to
87af40c
Compare
long horizon = currentWatermark - allowedLatenessMillis; | ||
if (allowedLatenessMillis >= 0) { | ||
// purge window that are too late to accept any more input | ||
dataStorage.removeUpTo(horizon); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before removing from storage, should we check whether triggers are fired and data is send to downstream for window that is too late?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, will make the changes
ac79487
to
5ea52ad
Compare
7b43f51
to
90b4c48
Compare
* | ||
* @param <T> The type of the data that is stored per window | ||
* | ||
* TODO: Look at the possibility of integrating spillable data structure: https://issues.apache.org/jira/browse/APEXMALHAR-2026 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems very similar to the managed state implementations. Can we also look at managed state directly before looking at Spillable data structures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bhupeshchawda We will for sure look at ManagedState.
d52c7b5
to
2d28dbd
Compare
@siyuanh please review and merge |
@chinmaykolhatkar @brightchen @tweise If you guys are all ok with the change. I will merge it. |
I would prefer a bit more clarity on the storage part. We already have ManagedState and the spillable data structures in-memory implementation, is there a reason why we don't use them here? |
@siyuanh I agree with the changes done and is good to go as first version. Further changes to this can follow after this. |
2d28dbd
to
6048a56
Compare
@tweise We don't have an implementation of SpillableByteMap yet in master. Looks like this PR from @ilooner #324 has not provided that yet, but when we have an in-memory spillable map implementation, we will make another PR to change the current in-memory implementation of WindowedStorage. This should not be a blocker for merging this PR. |
@davidyan74 This PR may not be the place for it. But it would be good to have discussion about what Spillable datastructures will be needed. To me it looks like your WindowStorage interfaces are basically array list multimaps. If that's the case I would recommend you make a note on your Storage interfaces that they are temporary and will likely disappear very soon, since SpillableDatastructures already provides that abstraction. As long as we do those things, I think it would be acceptable to pull this in and make the improvements in the next round of changes. |
@davidyan74 In fact it may be a good exercise to validate the Spillable Data Structures interfaces by using the InMemorySpillableArrayListMultimap which is already in master as a next step. |
6048a56
to
0a2f979
Compare
@ilooner Thanks will take a look at InMemorySpillableArrayListMultimap |
@tweise I already marked the WindowedStorage interface Unstable and a note in the javadoc that the WindowedStorage interfaces may change or go away entirely soon and we have plans to integrate with spillable data structures in the very near future. Let me know if you think this PR is good to go. |
addressing PR comments Added trigger unit tests Fixed sliding window bug, added unit tests Fixed session window bug; added more unit tests Gives window a chance to trigger before purging because of lateness added more unit tests Process watermark only at end window Renamed WatermarkOpt to Type, and fixed bug when window in retraction storage is not dropped when it's too late added copyright rat check changed WindowOption from an abstract class to an interface Use mutable objects for accumulated types Retraction storage needs to be based on the output, not accumulated type Added more unit tests added support of fixed lateness in case watermark is not available from upstream changed name from fixed lateness to fixed watermark support inheriting windowed tuple from upstream when window option is not given
0a2f979
to
7a77274
Compare
I think this PR is good to go but we need to see the follow up work for scalable checkpointing and benchmarking would be good also. |
Filed two JIRAs - one to revisit Accumulation interface (https://issues.apache.org/jira/browse/APEXMALHAR-2145) and one to benchmark new operators and evaluate boxing impact (https://issues.apache.org/jira/browse/APEXMALHAR-2146) |
This review-only PR contains the interfaces and a first rough draft of the implementation of the operator that supports the concepts of windowing, watermarks, triggering and accumulation mode specified by the Apache Beam API.
When you review this PR, please:
thank you.