Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to create time based window using submission params #2494

Closed
Golfer474 opened this issue Jun 24, 2020 · 6 comments · Fixed by #2501
Closed

Unable to create time based window using submission params #2494

Golfer474 opened this issue Jun 24, 2020 · 6 comments · Fixed by #2501

Comments

@Golfer474
Copy link

Golfer474 commented Jun 24, 2020

The Java API [1] for creating a time based window does allow the use of submission time parameters for defining the window length. Therefore it's impossible to dynamically define a window length based on external configuration, and instead must be defined at compile time. Is this by design or is there a workaround for this?

[1] https://github.com/IBMStreams/streamsx.topology/blob/develop/java/src/com/ibm/streamsx/topology/TStream.java#L603

@markheger
Copy link
Member

It is currently not supported to provide the value to the window via submission parameter when using topology API.

Your intention is to have code like below, right?

Supplier<Long> someLong = topology.createSubmissionParameter("someLong", Long.class);
TWindow<Double, ?> lastNSeconds = readings.last(someLong, TimeUnit.SECONDS);

@Golfer474
Copy link
Author

@markheger correct, that would be the intention.

@ghost
Copy link

ghost commented Jun 25, 2020

Topology.createSubmissionParameter(...) creates a submission time parameter, that can be used only at runtime. At declaration time, i.e. when you invoke TStream.last(...) to declare a window, the Supplier's get() function will always return null. So you cannot

    Topology topo = new Topology("WinTest");
    TStream<Integer> data = topo.endlessSource(new Supplier<Integer>() {
        @Override
        public Integer get() { ... }
    });
    
    Supplier<Long> windowLength = topo.createSubmissionParameter("windowLength", 5L);
    TWindow<Integer, ?> lastTuples = data.last(windowLength.get(), TimeUnit.SECONDS);
    lastTuples.aggregate(new Function<List<Integer>, Integer>() {
        @Override
        public Integer apply(List<Integer> window) { ... }
    });

See Javadoc: http://ibmstreams.github.io/streamsx.topology/doc/releases/latest/javadoc/com/ibm/streamsx/topology/Topology.html#createSubmissionParameter-java.lang.String-java.lang.Class-

Instead (high effort), you can write a stateful transform function that keeps a list with your tuples. But you must evict older tuples by yourself from the list so that the window contains only those tuples that have been received within the time window length. In such a function, you can use Supplier.get() to get the submission time value.

An example of a stateful function logic is here:
http://ibmstreams.github.io/streamsx.documentation/docs/4.1/java/java-appapi-devguide/ (scroll down to Transform: Keeping track of state across tuples)

Another option (low effort) would be to pass the window length via command line arg of your program that builds and submits your topology:

public static void main(String[] args) {
    ...
    long windowLength = Long.parseLong(args[0]);
    TWindow<Integer, ?> lastTuples = data.last(windowLength, TimeUnit.SECONDS);
    ...

@Golfer474
Copy link
Author

Golfer474 commented Jun 25, 2020

@rnostream thanks for the suggestions. I had already implemented option 2 (command line args) before opening this issue, and will probably stick with that for the time being. That said, I feel the API should allow for a Supplier to be passed in. Option 1 kind of the defeats the purpose of the built in windowing functionality.

@markheger
Copy link
Member

Proposed API changes

Two new methods to declare a window using the value from a submission parameter:

    /**
     * Declare a {@link TWindow} that continually represents the last {@code time} seconds
     * of tuples (in the given time {@code unit}) on this stream.
     * Same as {@link #last(long,TimeUnit)} except the {@code time} is
     * specified with a {@code Supplier<Long>} such as one created
     * by {@link Topology#createSubmissionParameter(String, Class)}.
     * 
     * @param time Time size of the window in seconds
     * @param unit Unit for {@code time}
     * @return Window on this stream representing the last {@code time} seconds.
     */
    TWindow<T,Object> last(Supplier<Long> time, TimeUnit unit);

    /**
     * Declare a {@link TWindow} that continually represents the last {@code count} tuples
     * seen on this stream.
     * Same as {@link #last(int)} except the {@code count} is
     * specified with a {@code Supplier<Integer>} such as one created
     * by {@link Topology#createSubmissionParameter(String, Class)}.
     * 
     * @param count Tuple size of the window
     * @return Window on this stream representing the last {@code count} tuples.
     */
    TWindow<T,Object> last(Supplier<Integer> count);

similar API changes for Python topology

markheger pushed a commit to markheger/streamsx.topology that referenced this issue Jul 6, 2020
markheger pushed a commit to markheger/streamsx.topology that referenced this issue Jul 6, 2020
markheger pushed a commit to markheger/streamsx.topology that referenced this issue Jul 6, 2020
markheger pushed a commit to markheger/streamsx.topology that referenced this issue Jul 7, 2020
markheger pushed a commit to markheger/streamsx.topology that referenced this issue Jul 7, 2020
markheger pushed a commit to markheger/streamsx.topology that referenced this issue Jul 7, 2020
markheger pushed a commit to markheger/streamsx.topology that referenced this issue Jul 7, 2020
@markheger
Copy link
Member

markheger commented Jul 7, 2020

TWindow<T,java.lang.Object> last(Supplier<java.lang.Integer> count)
Declare a TWindow that continually represents the last count tuples seen on this stream.

TWindow<T,java.lang.Object> lastSeconds(Supplier<java.lang.Integer> time)
Declare a TWindow that continually represents the last time seconds of tuples on this stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants