Skip to content

Conversation

@Vancior
Copy link
Contributor

@Vancior Vancior commented Mar 21, 2022

What is the purpose of the change

Support WindowedStream.aggregate in Python DataStream API, which is an alignment to the Java API, with doc updated.

Verifying this change

This change added tests and can be verified as follows:

  • Added python integration tests for several usages of WindowedStream.aggregate API

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 21, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Vancior Thanks a lot for the PR. Have left a few comments.

.aggregate(new AverageAggregate)
```
{{< /tab >}}
{{< tab "Python" >}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also update the chinese doc: docs.zh/content/docs/dev/datastream/operators/windows.md

```python
class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):

@abstractmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method clear is missing. Should also update the Java/Scala example.

input \
.key_by(<key selector>) \
.window(<window assigner>) \
.reduce(lambda v1, v2: (v1[0], v1[1] + v2[1]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is incorrect. It doesn't match the Java/Scala example, also doesn't match the description of this section: "The following example shows how an incremental ReduceFunction can be combined with a ProcessWindowFunction to return the smallest event in a window along with the start time of the window."

input \
.key_by(<key selector>) \
.window(<window assigner>) \
.apply(new MyWindowFunction())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.apply(new MyWindowFunction())
.apply(MyWindowFunction())

aggregation function.
:param result_type: Type information for the result type of the window function.
:return: The data stream that is the result of applying the window function to the window.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. versionadded:: 1.16.0

aggregate_function: AggregateFunction,
window_function: Union[WindowFunction, ProcessWindowFunction] = None,
accumulator_type: TypeInformation = None,
result_type: TypeInformation = None) -> DataStream:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result_type: TypeInformation = None) -> DataStream:
output_type: TypeInformation = None) -> DataStream:

Keep it consistent with the other methods.

Arriving data is incrementally aggregated using the given aggregate function. This means
that the window function typically has only a single value to process when called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about showing a simple example about this API?

@Vancior Vancior force-pushed the feat/window_aggregate branch from 68fab99 to 17733c3 Compare March 23, 2022 02:13
@dianfu
Copy link
Contributor

dianfu commented Mar 23, 2022

@flinkbot run azure

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@dianfu dianfu closed this in 137ed3e Mar 23, 2022
cun8cun8 pushed a commit to cun8cun8/flink that referenced this pull request Mar 28, 2022
cun8cun8 pushed a commit to cun8cun8/flink that referenced this pull request Mar 28, 2022
cun8cun8 pushed a commit to cun8cun8/flink that referenced this pull request Mar 28, 2022
JasonLeeCoding pushed a commit to JasonLeeCoding/flink that referenced this pull request May 27, 2022
@Vancior Vancior deleted the feat/window_aggregate branch June 6, 2022 06:43
zstraw pushed a commit to zstraw/flink that referenced this pull request Jul 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants